This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a3e287ac8 NIFI-14271 - dynamic property support in 
Kafka3ConnectionService
2a3e287ac8 is described below

commit 2a3e287ac8760aad1158061ea636209b741125a9
Author: Paul Grey <[email protected]>
AuthorDate: Fri Feb 21 20:41:59 2025 -0500

    NIFI-14271 - dynamic property support in Kafka3ConnectionService
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #9747.
---
 .../nifi/kafka/processors/AbstractKafkaBaseIT.java |  7 +++
 .../kafka/service/Kafka3ConnectionService.java     | 53 ++++++++++++++++------
 .../service/Kafka3ConnectionServiceBaseIT.java     |  7 +++
 .../nifi/kafka/processors/ConsumeKafkaTest.java    | 14 ++++++
 .../nifi/kafka/processors/PublishKafkaTest.java    | 14 ++++++
 .../validation/DynamicPropertyValidator.java       | 10 ++--
 6 files changed, 88 insertions(+), 17 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
index 0d2583bcd6..5114e78c49 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
@@ -38,6 +38,11 @@ public abstract class AbstractKafkaBaseIT {
 
     protected static final String IMAGE_NAME = "confluentinc/cp-kafka:7.6.1";  
// April 2024
 
+    private static final String DYNAMIC_PROPERTY_KEY_PUBLISH = 
"delivery.timeout.ms";
+    private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
+    private static final String DYNAMIC_PROPERTY_KEY_CONSUME = 
"fetch.max.wait.ms";
+    private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+
     protected static final long TIMESTAMP = System.currentTimeMillis();
 
     protected static final String CONNECTION_SERVICE_ID = 
Kafka3ConnectionService.class.getSimpleName();
@@ -64,6 +69,8 @@ public abstract class AbstractKafkaBaseIT {
         runner.addControllerService(CONNECTION_SERVICE_ID, connectionService);
         runner.setProperty(connectionService, 
Kafka3ConnectionService.BOOTSTRAP_SERVERS, 
kafkaContainer.getBootstrapServers());
         runner.setProperty(connectionService, 
Kafka3ConnectionService.MAX_POLL_RECORDS, "1000");
+        runner.setProperty(connectionService, DYNAMIC_PROPERTY_KEY_PUBLISH, 
DYNAMIC_PROPERTY_VALUE_PUBLISH);
+        runner.setProperty(connectionService, DYNAMIC_PROPERTY_KEY_CONSUME, 
DYNAMIC_PROPERTY_VALUE_CONSUME);
         runner.enableControllerService(connectionService);
         return CONNECTION_SERVICE_ID;
     }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 72f336d21c..a78470778b 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -53,6 +53,7 @@ import org.apache.nifi.kafka.shared.property.SaslMechanism;
 import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
 import 
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
 import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
 import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -246,12 +247,14 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
 
     private volatile Properties clientProperties;
     private volatile ServiceConfiguration serviceConfiguration;
+    private volatile Properties producerProperties;
     private volatile Properties consumerProperties;
 
     @OnEnabled
     public void onEnabled(final ConfigurationContext configurationContext) {
         clientProperties = getClientProperties(configurationContext);
         serviceConfiguration = getServiceConfiguration(configurationContext);
+        producerProperties = getProducerProperties(configurationContext, 
clientProperties);
         consumerProperties = getConsumerProperties(configurationContext, 
clientProperties);
     }
 
@@ -261,6 +264,17 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         return PROPERTY_DESCRIPTORS;
     }
 
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
+                .name(propertyDescriptorName)
+                .addValidator(new 
DynamicPropertyValidator(ProducerConfig.class, ConsumerConfig.class))
+                .dynamic(true)
+                
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+                .build();
+    }
+
     @Override
     public KafkaConsumerService getConsumerService(final PollingContext 
pollingContext) {
         Objects.requireNonNull(pollingContext, "Polling Context required");
@@ -299,24 +313,24 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
 
     @Override
     public KafkaProducerService getProducerService(final ProducerConfiguration 
producerConfiguration) {
-        final Properties propertiesProducer = new Properties();
-        propertiesProducer.putAll(clientProperties);
+        final Properties properties = new Properties();
+        properties.putAll(producerProperties);
         if (producerConfiguration.getTransactionsEnabled()) {
-            propertiesProducer.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                     new 
TransactionIdSupplier(producerConfiguration.getTransactionIdPrefix()).get());
         }
         if (producerConfiguration.getDeliveryGuarantee() != null) {
-            propertiesProducer.put(ProducerConfig.ACKS_CONFIG, 
producerConfiguration.getDeliveryGuarantee());
+            properties.put(ProducerConfig.ACKS_CONFIG, 
producerConfiguration.getDeliveryGuarantee());
         }
         if (producerConfiguration.getCompressionCodec() != null) {
-            propertiesProducer.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
producerConfiguration.getCompressionCodec());
+            properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
producerConfiguration.getCompressionCodec());
         }
         final String partitionClass = 
producerConfiguration.getPartitionClass();
         if (partitionClass != null && 
partitionClass.startsWith("org.apache.kafka")) {
-            propertiesProducer.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
partitionClass);
+            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
partitionClass);
         }
 
-        return new Kafka3ProducerService(propertiesProducer, 
serviceConfiguration, producerConfiguration);
+        return new Kafka3ProducerService(properties, serviceConfiguration, 
producerConfiguration);
     }
 
     @Override
@@ -352,23 +366,37 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         return results;
     }
 
-    private Properties getConsumerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
+    private Properties getProducerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
         final Properties properties = new Properties();
         properties.putAll(defaultProperties);
 
-        final IsolationLevel isolationLevel = 
propertyContext.getProperty(TRANSACTION_ISOLATION_LEVEL).asAllowableValue(IsolationLevel.class);
-        properties.put(TRANSACTION_ISOLATION_LEVEL.getName(), 
isolationLevel.getValue());
+        final KafkaPropertyProvider propertyProvider = new 
StandardKafkaPropertyProvider(ProducerConfig.class);
+        final Map<String, Object> propertiesProvider = 
propertyProvider.getProperties(propertyContext);
+        propertiesProvider.forEach((key, value) -> properties.setProperty(key, 
value.toString()));
+
+        final long timePeriod = 
propertyContext.getProperty(METADATA_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, timePeriod);
 
         return properties;
     }
 
-    private Properties getClientProperties(final PropertyContext 
propertyContext) {
+    private Properties getConsumerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
         final Properties properties = new Properties();
+        properties.putAll(defaultProperties);
 
         final KafkaPropertyProvider propertyProvider = new 
StandardKafkaPropertyProvider(ConsumerConfig.class);
         final Map<String, Object> propertiesProvider = 
propertyProvider.getProperties(propertyContext);
         propertiesProvider.forEach((key, value) -> properties.setProperty(key, 
value.toString()));
 
+        final IsolationLevel isolationLevel = 
propertyContext.getProperty(TRANSACTION_ISOLATION_LEVEL).asAllowableValue(IsolationLevel.class);
+        properties.put(TRANSACTION_ISOLATION_LEVEL.getName(), 
isolationLevel.getValue());
+
+        return properties;
+    }
+
+    private Properties getClientProperties(final PropertyContext 
propertyContext) {
+        final Properties properties = new Properties();
+
         final String configuredBootstrapServers = 
propertyContext.getProperty(BOOTSTRAP_SERVERS).getValue();
         properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
configuredBootstrapServers);
 
@@ -380,9 +408,6 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         final int requestTimeoutMs = getRequestTimeoutMs(propertyContext);
         properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeoutMs);
 
-        final long timePeriod = 
propertyContext.getProperty(METADATA_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
-        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, timePeriod);
-
         return properties;
     }
 
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
index 998be030df..bbb4ac8ffa 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
@@ -94,6 +94,11 @@ public class Kafka3ConnectionServiceBaseIT {
 
     public static final String IMAGE_NAME = "confluentinc/cp-kafka:7.8.0";  // 
December 2024
 
+    private static final String DYNAMIC_PROPERTY_KEY_PUBLISH = 
"delivery.timeout.ms";
+    private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
+    private static final String DYNAMIC_PROPERTY_KEY_CONSUME = 
"fetch.max.wait.ms";
+    private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+
     private static final String GROUP_ID = 
Kafka3ConnectionService.class.getSimpleName();
 
     private static final String TOPIC = 
Kafka3ConnectionServiceBaseIT.class.getSimpleName();
@@ -203,6 +208,8 @@ public class Kafka3ConnectionServiceBaseIT {
         final Map<String, String> properties = new LinkedHashMap<>();
         properties.put(Kafka3ConnectionService.BOOTSTRAP_SERVERS.getName(), 
kafkaContainer.getBootstrapServers());
         properties.put(Kafka3ConnectionService.CLIENT_TIMEOUT.getName(), 
CLIENT_TIMEOUT);
+        properties.put(DYNAMIC_PROPERTY_KEY_PUBLISH, 
DYNAMIC_PROPERTY_VALUE_PUBLISH);
+        properties.put(DYNAMIC_PROPERTY_KEY_CONSUME, 
DYNAMIC_PROPERTY_VALUE_CONSUME);
         return properties;
     }
 
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java
index 87605dde0f..531e5fff07 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java
@@ -47,6 +47,11 @@ class ConsumeKafkaTest {
 
     private static final int FIRST_PARTITION = 0;
 
+    private static final String DYNAMIC_PROPERTY_KEY_PUBLISH = 
"delivery.timeout.ms";
+    private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
+    private static final String DYNAMIC_PROPERTY_KEY_CONSUME = 
"fetch.max.wait.ms";
+    private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+
     private static final String SERVICE_ID = 
KafkaConnectionService.class.getSimpleName();
 
     private static final String CONSUMER_GROUP_ID = 
ConsumeKafkaTest.class.getSimpleName();
@@ -115,6 +120,15 @@ class ConsumeKafkaTest {
         assertNotNull(firstResult.getExplanation());
     }
 
+    @Test
+    public void testDynamicProperties() throws InitializationException {
+        when(kafkaConnectionService.getIdentifier()).thenReturn(SERVICE_ID);
+        runner.addControllerService(SERVICE_ID, kafkaConnectionService);
+        runner.setProperty(kafkaConnectionService, 
DYNAMIC_PROPERTY_KEY_PUBLISH, DYNAMIC_PROPERTY_VALUE_PUBLISH);
+        runner.setProperty(kafkaConnectionService, 
DYNAMIC_PROPERTY_KEY_CONSUME, DYNAMIC_PROPERTY_VALUE_CONSUME);
+        runner.enableControllerService(kafkaConnectionService);
+    }
+
     private void setConnectionService() throws InitializationException {
         when(kafkaConnectionService.getIdentifier()).thenReturn(SERVICE_ID);
 
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTest.java
index eb245fb518..69552cee70 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTest.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTest.java
@@ -47,6 +47,11 @@ class PublishKafkaTest {
 
     private static final int FIRST_PARTITION = 0;
 
+    private static final String DYNAMIC_PROPERTY_KEY_PUBLISH = 
"delivery.timeout.ms";
+    private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
+    private static final String DYNAMIC_PROPERTY_KEY_CONSUME = 
"fetch.max.wait.ms";
+    private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+
     private static final String SERVICE_ID = 
KafkaConnectionService.class.getSimpleName();
 
     @Mock
@@ -110,6 +115,15 @@ class PublishKafkaTest {
         assertNotNull(firstResult.getExplanation());
     }
 
+    @Test
+    public void testDynamicProperties() throws InitializationException {
+        when(kafkaConnectionService.getIdentifier()).thenReturn(SERVICE_ID);
+        runner.addControllerService(SERVICE_ID, kafkaConnectionService);
+        runner.setProperty(kafkaConnectionService, 
DYNAMIC_PROPERTY_KEY_PUBLISH, DYNAMIC_PROPERTY_VALUE_PUBLISH);
+        runner.setProperty(kafkaConnectionService, 
DYNAMIC_PROPERTY_KEY_CONSUME, DYNAMIC_PROPERTY_VALUE_CONSUME);
+        runner.enableControllerService(kafkaConnectionService);
+    }
+
     private void setConnectionService() throws InitializationException {
         when(kafkaConnectionService.getIdentifier()).thenReturn(SERVICE_ID);
 
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
index 1ae8a7d233..a9b0585dd7 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.Validator;
 import 
org.apache.nifi.kafka.shared.property.provider.KafkaPropertyNameProvider;
 import 
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyNameProvider;
 
+import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -32,9 +33,12 @@ public class DynamicPropertyValidator implements Validator {
 
     private final Set<String> clientPropertyNames;
 
-    public DynamicPropertyValidator(final Class<?> kafkaClientClass) {
-        final KafkaPropertyNameProvider provider = new 
StandardKafkaPropertyNameProvider(kafkaClientClass);
-        clientPropertyNames = provider.getPropertyNames();
+    public DynamicPropertyValidator(final Class<?>... kafkaClientClasses) {
+        clientPropertyNames = new HashSet<>();
+        for (Class<?> kafkaClientClass : kafkaClientClasses) {
+            final KafkaPropertyNameProvider provider = new 
StandardKafkaPropertyNameProvider(kafkaClientClass);
+            clientPropertyNames.addAll(provider.getPropertyNames());
+        }
     }
 
     @Override

Reply via email to