This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 40bb1500be NIFI-14470 Fixed EL Handling in Kafka Key Property for 
PublishKafka
40bb1500be is described below

commit 40bb1500bef8a642e129b913c224df9b4a703abb
Author: Paul Grey <[email protected]>
AuthorDate: Wed Apr 23 17:33:58 2025 -0400

    NIFI-14470 Fixed EL Handling in Kafka Key Property for PublishKafka
    
    - Aligned behavior with Property Descriptor and previous implementation
    
    This closes #9897
    
    Signed-off-by: David Handermann <[email protected]>
---
 ...fkaIT.java => PublishKafkaExpressionKeyIT.java} | 31 +++++++---------------
 .../nifi/kafka/processors/PublishKafkaIT.java      |  3 ++-
 .../kafka/processors/PublishKafkaRecordIT.java     |  3 ++-
 .../kafka/processors/PublishKafkaTombstoneIT.java  |  3 ++-
 .../apache/nifi/kafka/processors/PublishKafka.java |  7 ++---
 .../producer/key/AttributeKeyFactory.java          | 15 +++++++----
 .../producer/key/AttributeKeyFactoryTest.java      | 19 ++++++++++---
 7 files changed, 45 insertions(+), 36 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaExpressionKeyIT.java
similarity index 70%
copy from 
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
copy to 
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaExpressionKeyIT.java
index 2308815432..5a2b37aebf 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaExpressionKeyIT.java
@@ -19,7 +19,6 @@ package org.apache.nifi.kafka.processors;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.header.Header;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -27,20 +26,18 @@ import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
 
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
 @TestMethodOrder(MethodOrderer.MethodName.class)
-public class PublishKafkaIT extends AbstractPublishKafkaIT {
-    private static final String TEST_KEY_ATTRIBUTE = "my-key";
+public class PublishKafkaExpressionKeyIT extends AbstractPublishKafkaIT {
+    private static final String TEST_KEY = "some-key";
+    private static final String TEST_KEY_EL = "${some-key}";
     private static final String TEST_KEY_VALUE = "some-key-value";
-    private static final String TEST_RECORD_VALUE = "value-" + 
System.currentTimeMillis();
+    private static final String TEST_VALUE = "some-value-" + 
System.currentTimeMillis();
 
     @Test
     public void test_1_KafkaTestContainerProduceOne() throws 
InitializationException {
@@ -48,15 +45,12 @@ public class PublishKafkaIT extends AbstractPublishKafkaIT {
         runner.setValidateExpressionUsage(false);
         runner.setProperty(PublishKafka.CONNECTION_SERVICE, 
addKafkaConnectionService(runner));
         runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
-        runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
-        runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE);
+        runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_EL);
 
         final Map<String, String> attributes = new HashMap<>();
-        attributes.put("a1", "valueA1");
-        attributes.put("b1", "valueB1");
-        attributes.put(TEST_KEY_ATTRIBUTE, TEST_KEY_VALUE);
+        attributes.put(TEST_KEY, TEST_KEY_VALUE);
 
-        runner.enqueue(TEST_RECORD_VALUE, attributes);
+        runner.enqueue(TEST_VALUE, attributes);
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
     }
@@ -69,12 +63,7 @@ public class PublishKafkaIT extends AbstractPublishKafkaIT {
             assertEquals(1, records.count());
             final ConsumerRecord<String, String> record = 
records.iterator().next();
             assertEquals(TEST_KEY_VALUE, record.key());
-            assertEquals(TEST_RECORD_VALUE, record.value());
-            final List<Header> headers = 
Arrays.asList(record.headers().toArray());
-            assertEquals(1, headers.size());
-            final Header header = record.headers().iterator().next();
-            assertEquals("a1", header.key());
-            assertEquals("valueA1", new String(header.value(), 
StandardCharsets.UTF_8));
+            assertEquals(TEST_VALUE, record.value());
         }
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
index 2308815432..755edb4d5f 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
@@ -39,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 @TestMethodOrder(MethodOrderer.MethodName.class)
 public class PublishKafkaIT extends AbstractPublishKafkaIT {
     private static final String TEST_KEY_ATTRIBUTE = "my-key";
+    private static final String TEST_KEY_ATTRIBUTE_EL = "${my-key}";
     private static final String TEST_KEY_VALUE = "some-key-value";
     private static final String TEST_RECORD_VALUE = "value-" + 
System.currentTimeMillis();
 
@@ -49,7 +50,7 @@ public class PublishKafkaIT extends AbstractPublishKafkaIT {
         runner.setProperty(PublishKafka.CONNECTION_SERVICE, 
addKafkaConnectionService(runner));
         runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
         runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
-        runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE);
+        runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE_EL);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("a1", "valueA1");
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java
index 41fd9dbbc4..0325d87ba0 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java
@@ -46,6 +46,7 @@ public class PublishKafkaRecordIT extends 
AbstractPublishKafkaIT {
     private static final String TEST_RESOURCE = 
"org/apache/nifi/kafka/processors/publish/ff.json";
 
     private static final String KEY_ATTRIBUTE_KEY = "keyAttribute";
+    private static final String KEY_ATTRIBUTE_KEY_EL = "${keyAttribute}";
     private static final String KEY_ATTRIBUTE_VALUE = "keyAttributeValue";
 
     private static final int TEST_RECORD_COUNT = 3;
@@ -59,7 +60,7 @@ public class PublishKafkaRecordIT extends 
AbstractPublishKafkaIT {
         addRecordWriterService(runner);
 
         runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
-        runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY);
+        runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY_EL);
         runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
 
         final Map<String, String> attributes = new HashMap<>();
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTombstoneIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTombstoneIT.java
index ad651b1051..7a35de913f 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTombstoneIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTombstoneIT.java
@@ -41,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 @TestMethodOrder(MethodOrderer.MethodName.class)
 public class PublishKafkaTombstoneIT extends AbstractPublishKafkaIT {
     private static final String TEST_KEY_ATTRIBUTE = "my-key";
+    private static final String TEST_KEY_ATTRIBUTE_EL = "${my-key}";
     private static final String TEST_KEY_VALUE = "some-key-value";
     private static final byte[] TEST_RECORD_VALUE = new byte[0];
 
@@ -51,7 +52,7 @@ public class PublishKafkaTombstoneIT extends 
AbstractPublishKafkaIT {
         runner.setProperty(PublishKafka.CONNECTION_SERVICE, 
addKafkaConnectionService(runner));
         runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
         runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
-        runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE);
+        runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE_EL);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("a1", "valueA1");
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
index d504cf0b96..1777bd4504 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
@@ -167,7 +167,7 @@ public class PublishKafka extends AbstractProcessor 
implements KafkaPublishCompo
     static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new 
PropertyDescriptor.Builder()
             .name("Transactional ID Prefix")
             .description("Specifies the KafkaProducer config transactional.id 
will be a generated UUID and will be prefixed with the configured string.")
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .dependsOn(TRANSACTIONS_ENABLED, "true")
             .required(false)
@@ -249,6 +249,7 @@ public class PublishKafka extends AbstractProcessor 
implements KafkaPublishCompo
             .description("For any attribute that is added as a Kafka Record 
Header, this property indicates the Character Encoding to use for serializing 
the headers.")
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .defaultValue(StandardCharsets.UTF_8.displayName())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(true)
             .dependsOn(ATTRIBUTE_HEADER_PATTERN)
             .build();
@@ -559,13 +560,13 @@ public class PublishKafka extends AbstractProcessor 
implements KafkaPublishCompo
 
         final RecordSetWriterFactory keyWriterFactory = 
context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final String kafkaKeyAttribute = 
context.getProperty(KAFKA_KEY).getValue();
+        final PropertyValue kafkaKeyAttribute = context.getProperty(KAFKA_KEY);
         final String keyAttributeEncoding = 
context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
         final String messageKeyField = publishStrategy == 
PublishStrategy.USE_VALUE
                 ? 
context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue()
 : null;
         final KeyFactory keyFactory = messageKeyField != null
                 ? new MessageKeyFactory(flowFile, messageKeyField, 
keyWriterFactory, getLogger())
-                : new AttributeKeyFactory(kafkaKeyAttribute, 
keyAttributeEncoding);
+                : new AttributeKeyFactory(flowFile, kafkaKeyAttribute, 
keyAttributeEncoding);
 
         if (readerFactory != null && writerFactory != null) {
             return switch (publishStrategy) {
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactory.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactory.java
index d1a64258ed..c3673bd633 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactory.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactory.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.kafka.processors.producer.key;
 
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
 import org.apache.nifi.serialization.record.Record;
 
@@ -25,18 +27,21 @@ import java.util.Map;
 import java.util.Optional;
 
 public class AttributeKeyFactory implements KeyFactory {
-    private final String keyAttribute;
+    private final FlowFile flowFile;
+    private final PropertyValue keyAttribute;
     private final String keyAttributeEncoding;
 
-    public AttributeKeyFactory(final String keyAttribute,
-                               final String keyAttributeEncoding) {
-        this.keyAttribute = (keyAttribute == null) ? 
KafkaFlowFileAttribute.KAFKA_KEY : keyAttribute;
+    public AttributeKeyFactory(final FlowFile flowFile, final PropertyValue 
keyAttribute, final String keyAttributeEncoding) {
+        this.flowFile = flowFile;
+        this.keyAttribute = keyAttribute;
         this.keyAttributeEncoding = 
Optional.ofNullable(keyAttributeEncoding).orElse(StandardCharsets.UTF_8.name());
     }
 
     @Override
     public byte[] getKey(final Map<String, String> attributes, final Record 
record) throws UnsupportedEncodingException {
-        final String keyAttributeValue = attributes.get(keyAttribute);
+        final String keyAttributeValue = keyAttribute.isSet()
+                ? 
keyAttribute.evaluateAttributeExpressions(flowFile).getValue()
+                : attributes.get(KafkaFlowFileAttribute.KAFKA_KEY);
         return (keyAttributeValue == null) ? null : 
keyAttributeValue.getBytes(keyAttributeEncoding);
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactoryTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactoryTest.java
index 72c3dbcb3b..5b14a10c85 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactoryTest.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactoryTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.nifi.kafka.processors.producer.key;
 
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyValue;
 import org.junit.jupiter.api.Test;
 
 import java.io.UnsupportedEncodingException;
@@ -31,6 +34,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class AttributeKeyFactoryTest {
@@ -39,8 +43,9 @@ public class AttributeKeyFactoryTest {
     void testNullKeyAttribute() throws UnsupportedEncodingException {
         final Map<String, String> attributes = new HashMap<>();
         final Record record = fabricateRecord();
+        final PropertyValue propertyValue = new MockPropertyValue(null);
 
-        final AttributeKeyFactory attributeKeyFactory = new 
AttributeKeyFactory(null, null);
+        final AttributeKeyFactory attributeKeyFactory = new 
AttributeKeyFactory(null, propertyValue, null);
         assertNull(attributeKeyFactory.getKey(attributes, record));
     }
 
@@ -48,9 +53,12 @@ public class AttributeKeyFactoryTest {
     void testNullKeyAttributeValue() throws UnsupportedEncodingException {
         final Map<String, String> attributes = new HashMap<>();
         final Record record = fabricateRecord();
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+        flowFile.putAttributes(attributes);
+        final PropertyValue propertyValue = new MockPropertyValue("${A}");
 
-        final AttributeKeyFactory attributeKeyFactory = new 
AttributeKeyFactory("A", null);
-        assertNull(attributeKeyFactory.getKey(attributes, record));
+        final AttributeKeyFactory attributeKeyFactory = new 
AttributeKeyFactory(flowFile, propertyValue, null);
+        assertEquals(0, attributeKeyFactory.getKey(attributes, record).length);
     }
 
     @Test
@@ -59,8 +67,11 @@ public class AttributeKeyFactoryTest {
         attributes.put("A", "valueA");
         attributes.put("B", "valueB");
         final Record record = fabricateRecord();
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+        flowFile.putAttributes(attributes);
+        final PropertyValue propertyValue = new MockPropertyValue("${A}");
 
-        final AttributeKeyFactory attributeKeyFactory = new 
AttributeKeyFactory("A", null);
+        final AttributeKeyFactory attributeKeyFactory = new 
AttributeKeyFactory(flowFile, propertyValue, null);
         assertArrayEquals("valueA".getBytes(StandardCharsets.UTF_8), 
attributeKeyFactory.getKey(attributes, record));
     }
 

Reply via email to