This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 91f6b42 NIFI-8085 Use poll(Duration) in ConsumeKafka_2_x processors
91f6b42 is described below
commit 91f6b42985c20ee9b1ef177d16c70c820a5cdf0e
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Sat Dec 12 13:54:56 2020 +0100
NIFI-8085 Use poll(Duration) in ConsumeKafka_2_x processors
Signed-off-by: Pierre Villard <[email protected]>
This closes #4725.
---
.../nifi/processors/kafka/pubsub/ConsumerLease.java | 3 ++-
.../nifi/processors/kafka/pubsub/ConsumerPoolTest.java | 15 +++++++--------
.../nifi/processors/kafka/pubsub/ConsumerLease.java | 3 ++-
.../nifi/processors/kafka/pubsub/ConsumerPoolTest.java | 13 +++++++------
4 files changed, 18 insertions(+), 16 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 3ecec49..729c801 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -48,6 +48,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -178,7 +179,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
* This behavior has been fixed via Kafka KIP-62 and available from
Kafka client 0.10.1.0.
*/
try {
- final ConsumerRecords<byte[], byte[]> records =
kafkaConsumer.poll(10);
+ final ConsumerRecords<byte[], byte[]> records =
kafkaConsumer.poll(Duration.ofMillis(10));
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 3414420..195d2cb 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -24,8 +24,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerPool;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.junit.Before;
@@ -33,6 +31,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -43,7 +42,7 @@ import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -112,7 +111,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleCreateClose() throws Exception {
-
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L,
new byte[][]{}));
+
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi",
0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession,
mockContext)) {
lease.poll();
}
@@ -144,7 +143,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs =
createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyLong())).thenReturn(firstRecs,
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(firstRecs,
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession,
mockContext)) {
lease.poll();
lease.commit();
@@ -160,7 +159,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleBatchCreateClose() throws Exception {
-
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L,
new byte[][]{}));
+
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi",
0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
try (final ConsumerLease lease =
testPool.obtainConsumer(mockSession, mockContext)) {
for (int j = 0; j < 100; j++) {
@@ -187,7 +186,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs =
createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyLong())).thenReturn(firstRecs,
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(firstRecs,
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease =
testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@@ -204,7 +203,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolConsumerFails() throws Exception {
- when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+ when(consumer.poll(any(Duration.class))).thenThrow(new
KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession,
mockContext)) {
try {
lease.poll();
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index c3846a2..e3e6124 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -48,6 +48,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -178,7 +179,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
* This behavior has been fixed via Kafka KIP-62 and available from
Kafka client 0.10.1.0.
*/
try {
- final ConsumerRecords<byte[], byte[]> records =
kafkaConsumer.poll(10);
+ final ConsumerRecords<byte[], byte[]> records =
kafkaConsumer.poll(Duration.ofMillis(10));
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 18e188c..195d2cb 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -31,6 +31,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -41,7 +42,7 @@ import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -110,7 +111,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleCreateClose() throws Exception {
-
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L,
new byte[][]{}));
+
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi",
0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession,
mockContext)) {
lease.poll();
}
@@ -142,7 +143,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs =
createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyLong())).thenReturn(firstRecs,
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(firstRecs,
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession,
mockContext)) {
lease.poll();
lease.commit();
@@ -158,7 +159,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleBatchCreateClose() throws Exception {
-
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L,
new byte[][]{}));
+
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi",
0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
try (final ConsumerLease lease =
testPool.obtainConsumer(mockSession, mockContext)) {
for (int j = 0; j < 100; j++) {
@@ -185,7 +186,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs =
createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyLong())).thenReturn(firstRecs,
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(firstRecs,
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease =
testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@@ -202,7 +203,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolConsumerFails() throws Exception {
- when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+ when(consumer.poll(any(Duration.class))).thenThrow(new
KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession,
mockContext)) {
try {
lease.poll();