This is an automated email from the ASF dual-hosted git repository.
rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 68810c79e KafkaSpout: moving to poll(Duration) due to deprecation of
poll(long)
new c93795ca9 Merge pull request #3612 from reiabreu/master
68810c79e is described below
commit 68810c79e6dbb836e98093f98f67f6e874f1b0a8
Author: Rui Abreu <[email protected]>
AuthorDate: Wed Dec 20 16:18:46 2023 +0000
KafkaSpout: moving to poll(Duration) due to deprecation of poll(long)
---
.../java/org/apache/storm/kafka/spout/KafkaSpout.java | 4 +++-
.../apache/storm/kafka/spout/KafkaSpoutEmitTest.java | 13 +++++++------
.../spout/KafkaSpoutLogCompactionSupportTest.java | 9 +++++----
.../kafka/spout/KafkaSpoutMessagingGuaranteeTest.java | 19 ++++++++++---------
.../storm/kafka/spout/KafkaSpoutRebalanceTest.java | 3 ++-
.../storm/kafka/spout/KafkaSpoutRetryLimitTest.java | 7 ++++---
.../spout/SpoutWithMockedConsumerSetupHelper.java | 4 ++--
7 files changed, 33 insertions(+), 26 deletions(-)
diff --git
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 89c021d7f..aeff50cc1 100644
---
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -24,6 +24,8 @@ import static
org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_E
import static
org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
import com.google.common.annotations.VisibleForTesting;
+
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -355,7 +357,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
try {
consumer.pause(pausedPartitions);
- final ConsumerRecords<K, V> consumerRecords =
consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+ final ConsumerRecords<K, V> consumerRecords =
consumer.poll(Duration.ofMillis(kafkaSpoutConfig.getPollTimeoutMs()));
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets,
consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 2c442c6cd..4d69633f1 100755
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -80,7 +81,7 @@ public class KafkaSpoutEmitTest {
Map<TopicPartition, List<ConsumerRecord<String, String>>> records =
new HashMap<>();
records.put(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10));
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(records));
spout.nextTuple();
@@ -100,7 +101,7 @@ public class KafkaSpoutEmitTest {
//This is cheating a bit since maxPollRecords would normally
spread this across multiple polls
records.put(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords));
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(records));
for (int i = 0; i < numRecords; i++) {
@@ -132,7 +133,7 @@ public class KafkaSpoutEmitTest {
}
InOrder inOrder = inOrder(consumerMock);
inOrder.verify(consumerMock).seek(partition, failedOffsets.get(0));
- inOrder.verify(consumerMock).poll(anyLong());
+ inOrder.verify(consumerMock).poll(any(Duration.class));
}
}
@@ -148,7 +149,7 @@ public class KafkaSpoutEmitTest {
records.put(partitionTwo,
SpoutWithMockedConsumerSetupHelper.createRecords(partitionTwo, 0,
spoutConfig.getMaxUncommittedOffsets() + 1));
int numMessages = spoutConfig.getMaxUncommittedOffsets()*2 + 1;
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(records));
for (int i = 0; i < numMessages; i++) {
@@ -175,7 +176,7 @@ public class KafkaSpoutEmitTest {
reset(collectorMock);
Time.advanceTime(50);
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition,
failedMessageIdPartitionOne.get().offset(), 1))));
spout.nextTuple();
@@ -187,7 +188,7 @@ public class KafkaSpoutEmitTest {
//Should not seek on the paused partition
inOrder.verify(consumerMock, never()).seek(eq(partitionTwo),
anyLong());
inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo));
- inOrder.verify(consumerMock).poll(anyLong());
+ inOrder.verify(consumerMock).poll(any(Duration.class));
inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo));
reset(collectorMock);
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
index 961f6c564..a19bb9d26 100644
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.hasKey;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -86,7 +87,7 @@ public class KafkaSpoutLogCompactionSupportTest {
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition,
8, 2));
records.put(partition, recordsForPartition);
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(records));
for (int i = 0; i < recordsForPartition.size(); i++) {
@@ -102,13 +103,13 @@ public class KafkaSpoutLogCompactionSupportTest {
// Advance time and then trigger first call to kafka consumer
commit; the commit must progress to offset 9
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
spout.nextTuple();
InOrder inOrder = inOrder(consumerMock);
inOrder.verify(consumerMock).commitSync(commitCapture.capture());
- inOrder.verify(consumerMock).poll(anyLong());
+ inOrder.verify(consumerMock).poll(any(Duration.class));
//verify that Offset 10 was last committed offset, since this is
the offset the spout should resume at
Map<TopicPartition, OffsetAndMetadata> commits =
commitCapture.getValue();
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 386d46a19..44c5d0a03 100644
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -36,6 +36,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -85,14 +86,14 @@ public class KafkaSpoutMessagingGuaranteeTest {
.build();
KafkaSpout<String, String> spout =
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock,
collectorMock, consumerMock, partition);
- when(consumerMock.poll(anyLong())).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
+ when(consumerMock.poll(any(Duration.class))).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0,
1))));
spout.nextTuple();
//The spout should have emitted the tuple, and must have committed it
before emit
InOrder inOrder = inOrder(consumerMock, collectorMock);
- inOrder.verify(consumerMock).poll(anyLong());
+ inOrder.verify(consumerMock).poll(any(Duration.class));
inOrder.verify(consumerMock).commitSync(commitCapture.capture());
inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
anyList());
@@ -105,7 +106,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
private void
doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String>
spoutConfig) {
KafkaSpout<String, String> spout =
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock,
collectorMock, consumerMock, partition);
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0,
spoutConfig.getMaxUncommittedOffsets()))))
.thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
@@ -115,7 +116,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
spout.nextTuple();
}
- verify(consumerMock, times(2)).poll(anyLong());
+ verify(consumerMock, times(2)).poll(any(Duration.class));
verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() *
2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
}
@@ -140,7 +141,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String>
spoutConfig) {
KafkaSpout<String, String> spout =
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock,
collectorMock, consumerMock, partition);
- when(consumerMock.poll(anyLong())).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
+ when(consumerMock.poll(any(Duration.class))).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0,
1))));
spout.nextTuple();
@@ -189,7 +190,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
try (SimulatedTime ignored = new SimulatedTime()) {
KafkaSpout<String, String> spout =
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock,
collectorMock, consumerMock, partition);
- when(consumerMock.poll(anyLong())).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
+ when(consumerMock.poll(any(Duration.class))).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0,
1))));
spout.nextTuple();
@@ -203,7 +204,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS +
spoutConfig.getOffsetsCommitPeriodMs());
- when(consumerMock.poll(anyLong())).thenReturn(new
ConsumerRecords<>(Collections.emptyMap()));
+ when(consumerMock.poll(any(Duration.class))).thenReturn(new
ConsumerRecords<>(Collections.emptyMap()));
spout.nextTuple();
@@ -222,7 +223,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
try (SimulatedTime ignored = new SimulatedTime()) {
KafkaSpout<String, String> spout =
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock,
collectorMock, consumerMock, partition);
- when(consumerMock.poll(anyLong())).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
+ when(consumerMock.poll(any(Duration.class))).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0,
1))));
spout.nextTuple();
@@ -256,7 +257,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
KafkaSpout<String, String> spout =
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock,
collectorMock, consumerMock, partition);
- when(consumerMock.poll(anyLong())).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
+ when(consumerMock.poll(any(Duration.class))).thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0,
1))));
spout.nextTuple();
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index e1964864a..7a42b3c0f 100644
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -101,7 +102,7 @@ public class KafkaSpoutRebalanceTest {
when(consumerMock.assignment()).thenReturn(assignedPartitions);
//Make the consumer return a single message for each partition
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new
ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked,
SpoutWithMockedConsumerSetupHelper.createRecords(partitionThatWillBeRevoked, 0,
1))))
.thenReturn(new
ConsumerRecords<>(Collections.singletonMap(assignedPartition,
SpoutWithMockedConsumerSetupHelper.createRecords(assignedPartition, 0, 1))))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 7982a5131..381796b73 100644
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -19,7 +19,7 @@ import static
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutC
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -84,7 +85,7 @@ public class KafkaSpoutRetryLimitTest {
int numRecords = lastOffset + 1;
records.put(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords));
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(records));
for (int i = 0; i < numRecords; i++) {
@@ -104,7 +105,7 @@ public class KafkaSpoutRetryLimitTest {
InOrder inOrder = inOrder(consumerMock);
inOrder.verify(consumerMock).commitSync(commitCapture.capture());
- inOrder.verify(consumerMock).poll(anyLong());
+ inOrder.verify(consumerMock).poll(any(Duration.class));
//verify that offset 4 was committed for the given TopicPartition,
since processing should resume at 4.
assertTrue(commitCapture.getValue().containsKey(partition));
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
index 77e8e40b3..1c9823702 100644
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -18,7 +18,6 @@ package org.apache.storm.kafka.spout;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -27,6 +26,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -152,7 +152,7 @@ public class SpoutWithMockedConsumerSetupHelper {
records.put(tp, tpRecords);
}
- when(consumerMock.poll(anyLong()))
+ when(consumerMock.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(records));
for (int i = 0; i < totalOffsets; i++) {