This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 50a99b291f NIFI-12851 - ConsumeKafka, remove limitation on count of
subscribed topics
50a99b291f is described below
commit 50a99b291f35ecd39bdb81697bce4dbeea37f854
Author: Paul Grey <[email protected]>
AuthorDate: Wed Feb 28 16:39:10 2024 -0500
NIFI-12851 - ConsumeKafka, remove limitation on count of subscribed topics
Signed-off-by: Pierre Villard <[email protected]>
This closes #8460.
---
.../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 2 +-
.../processors/kafka/pubsub/ConsumeKafka_2_6.java | 2 +-
.../kafka/pubsub/TestConsumeKafka_2_6.java | 38 ++++++++++++++++++++++
3 files changed, 40 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index bdf42b8c02..a7b5bc746b 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -467,7 +467,7 @@ public class ConsumeKafkaRecord_2_6 extends
AbstractProcessor implements KafkaCl
}
if (topicType.equals(TOPIC_NAME.getValue())) {
- for (final String topic : topicListing.split(",", 100)) {
+ for (final String topic : topicListing.split(",")) {
final String trimmedName = topic.trim();
if (!trimmedName.isEmpty()) {
topics.add(trimmedName);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 019f7daa55..9c4b8e8235 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -413,7 +413,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor
implements KafkaClientCo
}
if (topicType.equals(TOPIC_NAME.getValue())) {
- for (final String topic : topicListing.split(",", 100)) {
+ for (final String topic : topicListing.split(",")) {
final String trimmedName = topic.trim();
if (!trimmedName.isEmpty()) {
topics.add(trimmedName);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
index 31d2d10aaa..14028f623f 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
@@ -23,12 +23,20 @@ import
org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -45,6 +53,36 @@ public class TestConsumeKafka_2_6 {
mockConsumerPool = mock(ConsumerPool.class);
}
+ @Test
+ public void validateNoLimitToTopicCount() {
+ final int expectedCount = 101;
+ final String topics = String.join(",",
Collections.nCopies(expectedCount, "foo"));
+ final ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6() {
+ protected ConsumerPool createConsumerPool(final ProcessContext
context, final ComponentLog log) {
+ final ConsumerPool consumerPool =
super.createConsumerPool(context, log);
+ try {
+ final Field topicsField =
ConsumerPool.class.getDeclaredField("topics");
+ topicsField.setAccessible(true);
+ final Object o = topicsField.get(consumerPool);
+ final List<?> list = assertInstanceOf(List.class, o);
+ assertEquals(expectedCount, list.size());
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ return consumerPool;
+ }
+ };
+
+ TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+ runner.setValidateExpressionUsage(false);
+ runner.setProperty(ConsumeKafka_2_6.BOOTSTRAP_SERVERS,
"localhost:1234");
+ runner.setProperty(ConsumeKafka_2_6.TOPICS, topics);
+ runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET,
ConsumeKafka_2_6.OFFSET_EARLIEST);
+ runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ runner.run();
+ }
+
@Test
public void validateCustomValidatorSettings() {
ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6();