This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2a3e287ac8 NIFI-14271 - dynamic property support in
Kafka3ConnectionService
2a3e287ac8 is described below
commit 2a3e287ac8760aad1158061ea636209b741125a9
Author: Paul Grey <[email protected]>
AuthorDate: Fri Feb 21 20:41:59 2025 -0500
NIFI-14271 - dynamic property support in Kafka3ConnectionService
Signed-off-by: Pierre Villard <[email protected]>
This closes #9747.
---
.../nifi/kafka/processors/AbstractKafkaBaseIT.java | 7 +++
.../kafka/service/Kafka3ConnectionService.java | 53 ++++++++++++++++------
.../service/Kafka3ConnectionServiceBaseIT.java | 7 +++
.../nifi/kafka/processors/ConsumeKafkaTest.java | 14 ++++++
.../nifi/kafka/processors/PublishKafkaTest.java | 14 ++++++
.../validation/DynamicPropertyValidator.java | 10 ++--
6 files changed, 88 insertions(+), 17 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
index 0d2583bcd6..5114e78c49 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
@@ -38,6 +38,11 @@ public abstract class AbstractKafkaBaseIT {
protected static final String IMAGE_NAME = "confluentinc/cp-kafka:7.6.1";
// April 2024
+ private static final String DYNAMIC_PROPERTY_KEY_PUBLISH =
"delivery.timeout.ms";
+ private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
+ private static final String DYNAMIC_PROPERTY_KEY_CONSUME =
"fetch.max.wait.ms";
+ private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+
protected static final long TIMESTAMP = System.currentTimeMillis();
protected static final String CONNECTION_SERVICE_ID =
Kafka3ConnectionService.class.getSimpleName();
@@ -64,6 +69,8 @@ public abstract class AbstractKafkaBaseIT {
runner.addControllerService(CONNECTION_SERVICE_ID, connectionService);
runner.setProperty(connectionService,
Kafka3ConnectionService.BOOTSTRAP_SERVERS,
kafkaContainer.getBootstrapServers());
runner.setProperty(connectionService,
Kafka3ConnectionService.MAX_POLL_RECORDS, "1000");
+ runner.setProperty(connectionService, DYNAMIC_PROPERTY_KEY_PUBLISH,
DYNAMIC_PROPERTY_VALUE_PUBLISH);
+ runner.setProperty(connectionService, DYNAMIC_PROPERTY_KEY_CONSUME,
DYNAMIC_PROPERTY_VALUE_CONSUME);
runner.enableControllerService(connectionService);
return CONNECTION_SERVICE_ID;
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 72f336d21c..a78470778b 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -53,6 +53,7 @@ import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
import
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
@@ -246,12 +247,14 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
private volatile Properties clientProperties;
private volatile ServiceConfiguration serviceConfiguration;
+ private volatile Properties producerProperties;
private volatile Properties consumerProperties;
@OnEnabled
public void onEnabled(final ConfigurationContext configurationContext) {
clientProperties = getClientProperties(configurationContext);
serviceConfiguration = getServiceConfiguration(configurationContext);
+ producerProperties = getProducerProperties(configurationContext,
clientProperties);
consumerProperties = getConsumerProperties(configurationContext,
clientProperties);
}
@@ -261,6 +264,17 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
return PROPERTY_DESCRIPTORS;
}
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .description("Specifies the value for '" +
propertyDescriptorName + "' Kafka Configuration.")
+ .name(propertyDescriptorName)
+ .addValidator(new
DynamicPropertyValidator(ProducerConfig.class, ConsumerConfig.class))
+ .dynamic(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+ }
+
@Override
public KafkaConsumerService getConsumerService(final PollingContext
pollingContext) {
Objects.requireNonNull(pollingContext, "Polling Context required");
@@ -299,24 +313,24 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
@Override
public KafkaProducerService getProducerService(final ProducerConfiguration
producerConfiguration) {
- final Properties propertiesProducer = new Properties();
- propertiesProducer.putAll(clientProperties);
+ final Properties properties = new Properties();
+ properties.putAll(producerProperties);
if (producerConfiguration.getTransactionsEnabled()) {
- propertiesProducer.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
new
TransactionIdSupplier(producerConfiguration.getTransactionIdPrefix()).get());
}
if (producerConfiguration.getDeliveryGuarantee() != null) {
- propertiesProducer.put(ProducerConfig.ACKS_CONFIG,
producerConfiguration.getDeliveryGuarantee());
+ properties.put(ProducerConfig.ACKS_CONFIG,
producerConfiguration.getDeliveryGuarantee());
}
if (producerConfiguration.getCompressionCodec() != null) {
- propertiesProducer.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerConfiguration.getCompressionCodec());
+ properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerConfiguration.getCompressionCodec());
}
final String partitionClass =
producerConfiguration.getPartitionClass();
if (partitionClass != null &&
partitionClass.startsWith("org.apache.kafka")) {
- propertiesProducer.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
partitionClass);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
partitionClass);
}
- return new Kafka3ProducerService(propertiesProducer,
serviceConfiguration, producerConfiguration);
+ return new Kafka3ProducerService(properties, serviceConfiguration,
producerConfiguration);
}
@Override
@@ -352,23 +366,37 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
return results;
}
- private Properties getConsumerProperties(final PropertyContext
propertyContext, final Properties defaultProperties) {
+ private Properties getProducerProperties(final PropertyContext
propertyContext, final Properties defaultProperties) {
final Properties properties = new Properties();
properties.putAll(defaultProperties);
- final IsolationLevel isolationLevel =
propertyContext.getProperty(TRANSACTION_ISOLATION_LEVEL).asAllowableValue(IsolationLevel.class);
- properties.put(TRANSACTION_ISOLATION_LEVEL.getName(),
isolationLevel.getValue());
+ final KafkaPropertyProvider propertyProvider = new
StandardKafkaPropertyProvider(ProducerConfig.class);
+ final Map<String, Object> propertiesProvider =
propertyProvider.getProperties(propertyContext);
+ propertiesProvider.forEach((key, value) -> properties.setProperty(key,
value.toString()));
+
+ final long timePeriod =
propertyContext.getProperty(METADATA_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, timePeriod);
return properties;
}
- private Properties getClientProperties(final PropertyContext
propertyContext) {
+ private Properties getConsumerProperties(final PropertyContext
propertyContext, final Properties defaultProperties) {
final Properties properties = new Properties();
+ properties.putAll(defaultProperties);
final KafkaPropertyProvider propertyProvider = new
StandardKafkaPropertyProvider(ConsumerConfig.class);
final Map<String, Object> propertiesProvider =
propertyProvider.getProperties(propertyContext);
propertiesProvider.forEach((key, value) -> properties.setProperty(key,
value.toString()));
+ final IsolationLevel isolationLevel =
propertyContext.getProperty(TRANSACTION_ISOLATION_LEVEL).asAllowableValue(IsolationLevel.class);
+ properties.put(TRANSACTION_ISOLATION_LEVEL.getName(),
isolationLevel.getValue());
+
+ return properties;
+ }
+
+ private Properties getClientProperties(final PropertyContext
propertyContext) {
+ final Properties properties = new Properties();
+
final String configuredBootstrapServers =
propertyContext.getProperty(BOOTSTRAP_SERVERS).getValue();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
configuredBootstrapServers);
@@ -380,9 +408,6 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
final int requestTimeoutMs = getRequestTimeoutMs(propertyContext);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeoutMs);
- final long timePeriod =
propertyContext.getProperty(METADATA_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
- properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, timePeriod);
-
return properties;
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
index 998be030df..bbb4ac8ffa 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
@@ -94,6 +94,11 @@ public class Kafka3ConnectionServiceBaseIT {
public static final String IMAGE_NAME = "confluentinc/cp-kafka:7.8.0"; //
December 2024
+ private static final String DYNAMIC_PROPERTY_KEY_PUBLISH =
"delivery.timeout.ms";
+ private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
+ private static final String DYNAMIC_PROPERTY_KEY_CONSUME =
"fetch.max.wait.ms";
+ private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+
private static final String GROUP_ID =
Kafka3ConnectionService.class.getSimpleName();
private static final String TOPIC =
Kafka3ConnectionServiceBaseIT.class.getSimpleName();
@@ -203,6 +208,8 @@ public class Kafka3ConnectionServiceBaseIT {
final Map<String, String> properties = new LinkedHashMap<>();
properties.put(Kafka3ConnectionService.BOOTSTRAP_SERVERS.getName(),
kafkaContainer.getBootstrapServers());
properties.put(Kafka3ConnectionService.CLIENT_TIMEOUT.getName(),
CLIENT_TIMEOUT);
+ properties.put(DYNAMIC_PROPERTY_KEY_PUBLISH,
DYNAMIC_PROPERTY_VALUE_PUBLISH);
+ properties.put(DYNAMIC_PROPERTY_KEY_CONSUME,
DYNAMIC_PROPERTY_VALUE_CONSUME);
return properties;
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java
index 87605dde0f..531e5fff07 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java
@@ -47,6 +47,11 @@ class ConsumeKafkaTest {
private static final int FIRST_PARTITION = 0;
+ private static final String DYNAMIC_PROPERTY_KEY_PUBLISH =
"delivery.timeout.ms";
+ private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
+ private static final String DYNAMIC_PROPERTY_KEY_CONSUME =
"fetch.max.wait.ms";
+ private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+
private static final String SERVICE_ID =
KafkaConnectionService.class.getSimpleName();
private static final String CONSUMER_GROUP_ID =
ConsumeKafkaTest.class.getSimpleName();
@@ -115,6 +120,15 @@ class ConsumeKafkaTest {
assertNotNull(firstResult.getExplanation());
}
+ @Test
+ public void testDynamicProperties() throws InitializationException {
+ when(kafkaConnectionService.getIdentifier()).thenReturn(SERVICE_ID);
+ runner.addControllerService(SERVICE_ID, kafkaConnectionService);
+ runner.setProperty(kafkaConnectionService,
DYNAMIC_PROPERTY_KEY_PUBLISH, DYNAMIC_PROPERTY_VALUE_PUBLISH);
+ runner.setProperty(kafkaConnectionService,
DYNAMIC_PROPERTY_KEY_CONSUME, DYNAMIC_PROPERTY_VALUE_CONSUME);
+ runner.enableControllerService(kafkaConnectionService);
+ }
+
private void setConnectionService() throws InitializationException {
when(kafkaConnectionService.getIdentifier()).thenReturn(SERVICE_ID);
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTest.java
index eb245fb518..69552cee70 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTest.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTest.java
@@ -47,6 +47,11 @@ class PublishKafkaTest {
private static final int FIRST_PARTITION = 0;
+ private static final String DYNAMIC_PROPERTY_KEY_PUBLISH =
"delivery.timeout.ms";
+ private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
+ private static final String DYNAMIC_PROPERTY_KEY_CONSUME =
"fetch.max.wait.ms";
+ private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+
private static final String SERVICE_ID =
KafkaConnectionService.class.getSimpleName();
@Mock
@@ -110,6 +115,15 @@ class PublishKafkaTest {
assertNotNull(firstResult.getExplanation());
}
+ @Test
+ public void testDynamicProperties() throws InitializationException {
+ when(kafkaConnectionService.getIdentifier()).thenReturn(SERVICE_ID);
+ runner.addControllerService(SERVICE_ID, kafkaConnectionService);
+ runner.setProperty(kafkaConnectionService,
DYNAMIC_PROPERTY_KEY_PUBLISH, DYNAMIC_PROPERTY_VALUE_PUBLISH);
+ runner.setProperty(kafkaConnectionService,
DYNAMIC_PROPERTY_KEY_CONSUME, DYNAMIC_PROPERTY_VALUE_CONSUME);
+ runner.enableControllerService(kafkaConnectionService);
+ }
+
private void setConnectionService() throws InitializationException {
when(kafkaConnectionService.getIdentifier()).thenReturn(SERVICE_ID);
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
index 1ae8a7d233..a9b0585dd7 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.Validator;
import
org.apache.nifi.kafka.shared.property.provider.KafkaPropertyNameProvider;
import
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyNameProvider;
+import java.util.HashSet;
import java.util.Set;
/**
@@ -32,9 +33,12 @@ public class DynamicPropertyValidator implements Validator {
private final Set<String> clientPropertyNames;
- public DynamicPropertyValidator(final Class<?> kafkaClientClass) {
- final KafkaPropertyNameProvider provider = new
StandardKafkaPropertyNameProvider(kafkaClientClass);
- clientPropertyNames = provider.getPropertyNames();
+ public DynamicPropertyValidator(final Class<?>... kafkaClientClasses) {
+ clientPropertyNames = new HashSet<>();
+ for (Class<?> kafkaClientClass : kafkaClientClasses) {
+ final KafkaPropertyNameProvider provider = new
StandardKafkaPropertyNameProvider(kafkaClientClass);
+ clientPropertyNames.addAll(provider.getPropertyNames());
+ }
}
@Override