clolov commented on code in PR #14505: URL: https://github.com/apache/kafka/pull/14505#discussion_r1352038137
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java: ########## @@ -409,309 +430,266 @@ public void testAssignmentPauseResume() throws Exception { } sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2); return null; - }); - consumer.pause(Arrays.asList(UNASSIGNED_TOPIC_PARTITION)); - PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); - consumer.pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); - PowerMock.expectLastCall(); - - expectOnePoll().andAnswer(() -> { + }).doAnswer(invocation -> { try { sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION); fail("Trying to resume unassigned partition should have thrown an Connect exception"); } catch (ConnectException e) { // expected } - sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2); return null; - }); - consumer.resume(Arrays.asList(UNASSIGNED_TOPIC_PARTITION)); - PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); - consumer.resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); - PowerMock.expectLastCall(); + }).when(sinkTask).put(any(Collection.class)); - expectStopTask(); + doThrow(new IllegalStateException("unassigned topic partition")).when(consumer).pause(singletonList(UNASSIGNED_TOPIC_PARTITION)); + doAnswer(invocation -> null).when(consumer).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); - PowerMock.replayAll(); + doThrow(new IllegalStateException("unassigned topic partition")).when(consumer).resume(singletonList(UNASSIGNED_TOPIC_PARTITION)); + doAnswer(invocation -> null).when(consumer).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); + verifyInitializeTask(); + workerTask.iteration(); + verifyInitialAssignment(); + workerTask.iteration(); workerTask.iteration(); workerTask.iteration(); workerTask.stop(); workerTask.close(); + verifyStopTask(); + verifyTaskGetTopic(3); - PowerMock.verifyAll(); + verify(consumer, atLeastOnce()).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); + verify(consumer, atLeastOnce()).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); } @Test - public void testRewind() throws Exception { - expectInitializeTask(); - expectTaskGetTopic(true); - expectPollInitialAssignment(); + public void testRewind() { + expectTaskGetTopic(); + expectInitialAssignment(); + expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); final long startOffset = 40L; final Map<TopicPartition, Long> offsets = new HashMap<>(); - expectOnePoll().andAnswer(() -> { + doAnswer(invocation -> { + return null; // initial assignment + }).doAnswer(invocation -> { offsets.put(TOPIC_PARTITION, startOffset); sinkTaskContext.getValue().offset(offsets); return null; - }); - - consumer.seek(TOPIC_PARTITION, startOffset); - EasyMock.expectLastCall(); - - expectOnePoll().andAnswer(() -> { + }).doAnswer(invocation -> { Map<TopicPartition, Long> offsets1 = sinkTaskContext.getValue().offsets(); assertEquals(0, offsets1.size()); return null; - }); - - expectStopTask(); - PowerMock.replayAll(); + }).when(sinkTask).put(any(Collection.class)); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); + verifyInitializeTask(); + workerTask.iteration(); + verifyInitialAssignment(); + workerTask.iteration(); workerTask.iteration(); + verify(consumer).seek(TOPIC_PARTITION, startOffset); + workerTask.stop(); workerTask.close(); - - PowerMock.verifyAll(); + verifyStopTask(); + verifyTaskGetTopic(2); } @Test - public void testRewindOnRebalanceDuringPoll() throws Exception { - expectInitializeTask(); - expectTaskGetTopic(true); - expectPollInitialAssignment(); - expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2); + public void testRewindOnRebalanceDuringPoll() { + final long startOffset = 40L; + + expectTaskGetTopic(); + expectInitialAssignment(); + expectRebalanceDuringPoll(startOffset); - expectRebalanceDuringPoll().andAnswer(() -> { + doAnswer(invocation -> { + return null; // initial assignment + }).doAnswer(invocation -> { Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets(); assertEquals(0, offsets.size()); return null; - }); - expectStopTask(); - PowerMock.replayAll(); + }).when(sinkTask).put(any(Collection.class)); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); + verifyInitializeTask(); + workerTask.iteration(); + verifyInitialAssignment(); + workerTask.iteration(); + verify(consumer).seek(TOPIC_PARTITION, startOffset); + workerTask.stop(); workerTask.close(); - - PowerMock.verifyAll(); + verifyStopTask(); + verifyTaskGetTopic(1); } - private void expectInitializeTask() { - - consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); - PowerMock.expectLastCall(); - - sinkTask.initialize(EasyMock.capture(sinkTaskContext)); - PowerMock.expectLastCall(); - sinkTask.start(TASK_PROPS); - PowerMock.expectLastCall(); + private void verifyInitializeTask() { + verify(consumer).subscribe(eq(singletonList(TOPIC)), rebalanceListener.capture()); + verify(sinkTask).initialize(sinkTaskContext.capture()); + verify(sinkTask).start(TASK_PROPS); } - private void expectPollInitialAssignment() { - expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2); - sinkTask.open(INITIAL_ASSIGNMENT); - EasyMock.expectLastCall(); - - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(() -> { - rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); - return ConsumerRecords.empty(); - }); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); - - sinkTask.put(Collections.emptyList()); - EasyMock.expectLastCall(); + private void expectInitialAssignment() { + when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT); + INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); } - private IExpectationSetters<Set<TopicPartition>> expectConsumerAssignment(Set<TopicPartition> assignment) { - return EasyMock.expect(consumer.assignment()).andReturn(assignment); + private void verifyInitialAssignment() { + verify(sinkTask).open(INITIAL_ASSIGNMENT); + verify(sinkTask).put(Collections.emptyList()); } - private void expectStopTask() { - sinkTask.stop(); - PowerMock.expectLastCall(); + private void verifyStopTask() { + verify(sinkTask).stop(); // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the // consumer so it exits quickly - consumer.wakeup(); - PowerMock.expectLastCall(); + verify(consumer).wakeup(); - consumer.close(); - PowerMock.expectLastCall(); + verify(consumer).close(); try { - headerConverter.close(); + verify(headerConverter).close(); } catch (IOException e) { throw new RuntimeException(e); } - PowerMock.expectLastCall(); } // Note that this can only be called once per test currently - private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) { + private void expectPolls(final long pollDelayMs) { // Stub out all the consumer stream/iterator responses, which we just want to verify occur, // but don't care about the exact details here. - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer( - () -> { - // "Sleep" so time will progress - time.sleep(pollDelayMs); - ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( - Collections.singletonMap( - new TopicPartition(TOPIC, PARTITION), - Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, - 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty())))); - recordsReturned++; - return records; - }); - EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes(); - EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes(); - - final Capture<SinkRecord> recordCapture = EasyMock.newCapture(); - EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer( - recordCapture::getValue).anyTimes(); - - Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL); - sinkTask.put(EasyMock.capture(capturedRecords)); - EasyMock.expectLastCall().anyTimes(); - return capturedRecords; - } - - @SuppressWarnings("unchecked") - private IExpectationSetters<Object> expectOnePoll() { - // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of - // returning empty data, we return one record. The expectation is that the data will be ignored by the - // response behavior specified using the return value of this method. - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( - () -> { - // "Sleep" so time will progress - time.sleep(1L); - ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( - Collections.singletonMap( - new TopicPartition(TOPIC, PARTITION), - Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, - 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty())))); - recordsReturned++; - return records; - }); - EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); - EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); - sinkTask.put(EasyMock.anyObject(Collection.class)); - return EasyMock.expectLastCall(); + when(consumer.poll(any(Duration.class))).thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); + return ConsumerRecords.empty(); + }).thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> { + // "Sleep" so time will progress + time.sleep(pollDelayMs); + + TopicPartition topicPartition = new TopicPartition(TOPIC, PARTITION); + ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), Optional.empty()); + ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(Collections.singletonMap(topicPartition, singletonList(consumerRecord))); + recordsReturned++; + return records; + }); + when(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); + when(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); + when(transformationChain.apply(any(SinkRecord.class))).thenAnswer(AdditionalAnswers.returnsFirstArg()); } - @SuppressWarnings("unchecked") - private IExpectationSetters<Object> expectRebalanceDuringPoll() { + // @SuppressWarnings("unchecked") Review Comment: If this is unneeded can you remove it? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java: ########## @@ -409,309 +430,266 @@ public void testAssignmentPauseResume() throws Exception { } sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2); return null; - }); - consumer.pause(Arrays.asList(UNASSIGNED_TOPIC_PARTITION)); - PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); - consumer.pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); - PowerMock.expectLastCall(); - - expectOnePoll().andAnswer(() -> { + }).doAnswer(invocation -> { try { sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION); fail("Trying to resume unassigned partition should have thrown an Connect exception"); } catch (ConnectException e) { // expected } - sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2); return null; - }); - consumer.resume(Arrays.asList(UNASSIGNED_TOPIC_PARTITION)); - PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); - consumer.resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); - PowerMock.expectLastCall(); + }).when(sinkTask).put(any(Collection.class)); - expectStopTask(); + doThrow(new IllegalStateException("unassigned topic partition")).when(consumer).pause(singletonList(UNASSIGNED_TOPIC_PARTITION)); + doAnswer(invocation -> null).when(consumer).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); - PowerMock.replayAll(); + doThrow(new IllegalStateException("unassigned topic partition")).when(consumer).resume(singletonList(UNASSIGNED_TOPIC_PARTITION)); + doAnswer(invocation -> null).when(consumer).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); + verifyInitializeTask(); + workerTask.iteration(); + verifyInitialAssignment(); + workerTask.iteration(); workerTask.iteration(); workerTask.iteration(); workerTask.stop(); workerTask.close(); + verifyStopTask(); + verifyTaskGetTopic(3); - PowerMock.verifyAll(); + verify(consumer, atLeastOnce()).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); + verify(consumer, atLeastOnce()).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); } @Test - public void testRewind() throws Exception { - expectInitializeTask(); - expectTaskGetTopic(true); - expectPollInitialAssignment(); + public void testRewind() { + expectTaskGetTopic(); + expectInitialAssignment(); + expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); final long startOffset = 40L; final Map<TopicPartition, Long> offsets = new HashMap<>(); - expectOnePoll().andAnswer(() -> { + doAnswer(invocation -> { + return null; // initial assignment + }).doAnswer(invocation -> { offsets.put(TOPIC_PARTITION, startOffset); sinkTaskContext.getValue().offset(offsets); return null; - }); - - consumer.seek(TOPIC_PARTITION, startOffset); - EasyMock.expectLastCall(); - - expectOnePoll().andAnswer(() -> { + }).doAnswer(invocation -> { Map<TopicPartition, Long> offsets1 = sinkTaskContext.getValue().offsets(); assertEquals(0, offsets1.size()); return null; - }); - - expectStopTask(); - PowerMock.replayAll(); + }).when(sinkTask).put(any(Collection.class)); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); + verifyInitializeTask(); + workerTask.iteration(); + verifyInitialAssignment(); + workerTask.iteration(); workerTask.iteration(); + verify(consumer).seek(TOPIC_PARTITION, startOffset); + workerTask.stop(); workerTask.close(); - - PowerMock.verifyAll(); + verifyStopTask(); + verifyTaskGetTopic(2); } @Test - public void testRewindOnRebalanceDuringPoll() throws Exception { - expectInitializeTask(); - expectTaskGetTopic(true); - expectPollInitialAssignment(); - expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2); + public void testRewindOnRebalanceDuringPoll() { + final long startOffset = 40L; + + expectTaskGetTopic(); + expectInitialAssignment(); + expectRebalanceDuringPoll(startOffset); - expectRebalanceDuringPoll().andAnswer(() -> { + doAnswer(invocation -> { + return null; // initial assignment + }).doAnswer(invocation -> { Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets(); assertEquals(0, offsets.size()); return null; - }); - expectStopTask(); - PowerMock.replayAll(); + }).when(sinkTask).put(any(Collection.class)); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); + verifyInitializeTask(); + workerTask.iteration(); + verifyInitialAssignment(); + workerTask.iteration(); + verify(consumer).seek(TOPIC_PARTITION, startOffset); + workerTask.stop(); workerTask.close(); - - PowerMock.verifyAll(); + verifyStopTask(); + verifyTaskGetTopic(1); } - private void expectInitializeTask() { - - consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); - PowerMock.expectLastCall(); - - sinkTask.initialize(EasyMock.capture(sinkTaskContext)); - PowerMock.expectLastCall(); - sinkTask.start(TASK_PROPS); - PowerMock.expectLastCall(); + private void verifyInitializeTask() { + verify(consumer).subscribe(eq(singletonList(TOPIC)), rebalanceListener.capture()); + verify(sinkTask).initialize(sinkTaskContext.capture()); + verify(sinkTask).start(TASK_PROPS); } - private void expectPollInitialAssignment() { - expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2); - sinkTask.open(INITIAL_ASSIGNMENT); - EasyMock.expectLastCall(); - - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(() -> { - rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); - return ConsumerRecords.empty(); - }); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); - - sinkTask.put(Collections.emptyList()); - EasyMock.expectLastCall(); + private void expectInitialAssignment() { + when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT); + INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); } - private IExpectationSetters<Set<TopicPartition>> expectConsumerAssignment(Set<TopicPartition> assignment) { - return EasyMock.expect(consumer.assignment()).andReturn(assignment); + private void verifyInitialAssignment() { + verify(sinkTask).open(INITIAL_ASSIGNMENT); + verify(sinkTask).put(Collections.emptyList()); } - private void expectStopTask() { - sinkTask.stop(); - PowerMock.expectLastCall(); + private void verifyStopTask() { + verify(sinkTask).stop(); // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the // consumer so it exits quickly - consumer.wakeup(); - PowerMock.expectLastCall(); + verify(consumer).wakeup(); - consumer.close(); - PowerMock.expectLastCall(); + verify(consumer).close(); try { - headerConverter.close(); + verify(headerConverter).close(); } catch (IOException e) { throw new RuntimeException(e); } - PowerMock.expectLastCall(); } // Note that this can only be called once per test currently - private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) { + private void expectPolls(final long pollDelayMs) { // Stub out all the consumer stream/iterator responses, which we just want to verify occur, // but don't care about the exact details here. - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer( - () -> { - // "Sleep" so time will progress - time.sleep(pollDelayMs); - ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( - Collections.singletonMap( - new TopicPartition(TOPIC, PARTITION), - Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, - 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty())))); - recordsReturned++; - return records; - }); - EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes(); - EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes(); - - final Capture<SinkRecord> recordCapture = EasyMock.newCapture(); - EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer( - recordCapture::getValue).anyTimes(); - - Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL); - sinkTask.put(EasyMock.capture(capturedRecords)); - EasyMock.expectLastCall().anyTimes(); - return capturedRecords; - } - - @SuppressWarnings("unchecked") - private IExpectationSetters<Object> expectOnePoll() { - // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of - // returning empty data, we return one record. The expectation is that the data will be ignored by the - // response behavior specified using the return value of this method. - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( - () -> { - // "Sleep" so time will progress - time.sleep(1L); - ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( - Collections.singletonMap( - new TopicPartition(TOPIC, PARTITION), - Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, - 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty())))); - recordsReturned++; - return records; - }); - EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); - EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); - sinkTask.put(EasyMock.anyObject(Collection.class)); - return EasyMock.expectLastCall(); + when(consumer.poll(any(Duration.class))).thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); + return ConsumerRecords.empty(); + }).thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> { + // "Sleep" so time will progress + time.sleep(pollDelayMs); + + TopicPartition topicPartition = new TopicPartition(TOPIC, PARTITION); + ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), Optional.empty()); + ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(Collections.singletonMap(topicPartition, singletonList(consumerRecord))); + recordsReturned++; + return records; + }); + when(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); + when(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); + when(transformationChain.apply(any(SinkRecord.class))).thenAnswer(AdditionalAnswers.returnsFirstArg()); } - @SuppressWarnings("unchecked") - private IExpectationSetters<Object> expectRebalanceDuringPoll() { + // @SuppressWarnings("unchecked") + @SuppressWarnings("SameParameterValue") + private void expectRebalanceDuringPoll(long startOffset) { final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3); - final long startOffset = 40L; final Map<TopicPartition, Long> offsets = new HashMap<>(); offsets.put(TOPIC_PARTITION, startOffset); - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( - () -> { - // "Sleep" so time will progress - time.sleep(1L); - - sinkTaskContext.getValue().offset(offsets); - rebalanceListener.getValue().onPartitionsAssigned(partitions); - - ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( - Collections.singletonMap( - new TopicPartition(TOPIC, PARTITION), - Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, - 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()) - ))); - recordsReturned++; - return records; - }); - - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); - - sinkTask.open(partitions); - EasyMock.expectLastCall(); - - consumer.seek(TOPIC_PARTITION, startOffset); - EasyMock.expectLastCall(); - - EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); - EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); - sinkTask.put(EasyMock.anyObject(Collection.class)); - return EasyMock.expectLastCall(); + when(consumer.poll(any(Duration.class))).thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); + return ConsumerRecords.empty(); + }).thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> { + // "Sleep" so time will progress + time.sleep(1L); + + sinkTaskContext.getValue().offset(offsets); + rebalanceListener.getValue().onPartitionsAssigned(partitions); + + TopicPartition topicPartition = new TopicPartition(TOPIC, PARTITION); + ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>( + TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, + 0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), Optional.empty()); + ConsumerRecords<byte[], byte[]> records = + new ConsumerRecords<>(Collections.singletonMap(topicPartition, singletonList(consumerRecord))); + recordsReturned++; + return records; + }); + + when(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); + when(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); } - private Capture<OffsetCommitCallback> expectOffsetCommit(final long expectedMessages, - final RuntimeException error, - final Exception consumerCommitError, - final long consumerCommitDelayMs, - final boolean invokeCallback) { + @SuppressWarnings("rawtypes") Review Comment: For my understanding, what in the function is causing this warning? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org