This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 40bb1500be NIFI-14470 Fixed EL Handling in Kafka Key Property for
PublishKafka
40bb1500be is described below
commit 40bb1500bef8a642e129b913c224df9b4a703abb
Author: Paul Grey <[email protected]>
AuthorDate: Wed Apr 23 17:33:58 2025 -0400
NIFI-14470 Fixed EL Handling in Kafka Key Property for PublishKafka
- Aligned behavior with Property Descriptor and previous implementation
This closes #9897
Signed-off-by: David Handermann <[email protected]>
---
...fkaIT.java => PublishKafkaExpressionKeyIT.java} | 31 +++++++---------------
.../nifi/kafka/processors/PublishKafkaIT.java | 3 ++-
.../kafka/processors/PublishKafkaRecordIT.java | 3 ++-
.../kafka/processors/PublishKafkaTombstoneIT.java | 3 ++-
.../apache/nifi/kafka/processors/PublishKafka.java | 7 ++---
.../producer/key/AttributeKeyFactory.java | 15 +++++++----
.../producer/key/AttributeKeyFactoryTest.java | 19 ++++++++++---
7 files changed, 45 insertions(+), 36 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaExpressionKeyIT.java
similarity index 70%
copy from
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
copy to
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaExpressionKeyIT.java
index 2308815432..5a2b37aebf 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaExpressionKeyIT.java
@@ -19,7 +19,6 @@ package org.apache.nifi.kafka.processors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.header.Header;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -27,20 +26,18 @@ import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
@TestMethodOrder(MethodOrderer.MethodName.class)
-public class PublishKafkaIT extends AbstractPublishKafkaIT {
- private static final String TEST_KEY_ATTRIBUTE = "my-key";
+public class PublishKafkaExpressionKeyIT extends AbstractPublishKafkaIT {
+ private static final String TEST_KEY = "some-key";
+ private static final String TEST_KEY_EL = "${some-key}";
private static final String TEST_KEY_VALUE = "some-key-value";
- private static final String TEST_RECORD_VALUE = "value-" +
System.currentTimeMillis();
+ private static final String TEST_VALUE = "some-value-" +
System.currentTimeMillis();
@Test
public void test_1_KafkaTestContainerProduceOne() throws
InitializationException {
@@ -48,15 +45,12 @@ public class PublishKafkaIT extends AbstractPublishKafkaIT {
runner.setValidateExpressionUsage(false);
runner.setProperty(PublishKafka.CONNECTION_SERVICE,
addKafkaConnectionService(runner));
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
- runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
- runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE);
+ runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_EL);
final Map<String, String> attributes = new HashMap<>();
- attributes.put("a1", "valueA1");
- attributes.put("b1", "valueB1");
- attributes.put(TEST_KEY_ATTRIBUTE, TEST_KEY_VALUE);
+ attributes.put(TEST_KEY, TEST_KEY_VALUE);
- runner.enqueue(TEST_RECORD_VALUE, attributes);
+ runner.enqueue(TEST_VALUE, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
}
@@ -69,12 +63,7 @@ public class PublishKafkaIT extends AbstractPublishKafkaIT {
assertEquals(1, records.count());
final ConsumerRecord<String, String> record =
records.iterator().next();
assertEquals(TEST_KEY_VALUE, record.key());
- assertEquals(TEST_RECORD_VALUE, record.value());
- final List<Header> headers =
Arrays.asList(record.headers().toArray());
- assertEquals(1, headers.size());
- final Header header = record.headers().iterator().next();
- assertEquals("a1", header.key());
- assertEquals("valueA1", new String(header.value(),
StandardCharsets.UTF_8));
+ assertEquals(TEST_VALUE, record.value());
}
}
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
index 2308815432..755edb4d5f 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java
@@ -39,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@TestMethodOrder(MethodOrderer.MethodName.class)
public class PublishKafkaIT extends AbstractPublishKafkaIT {
private static final String TEST_KEY_ATTRIBUTE = "my-key";
+ private static final String TEST_KEY_ATTRIBUTE_EL = "${my-key}";
private static final String TEST_KEY_VALUE = "some-key-value";
private static final String TEST_RECORD_VALUE = "value-" +
System.currentTimeMillis();
@@ -49,7 +50,7 @@ public class PublishKafkaIT extends AbstractPublishKafkaIT {
runner.setProperty(PublishKafka.CONNECTION_SERVICE,
addKafkaConnectionService(runner));
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
- runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE);
+ runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE_EL);
final Map<String, String> attributes = new HashMap<>();
attributes.put("a1", "valueA1");
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java
index 41fd9dbbc4..0325d87ba0 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java
@@ -46,6 +46,7 @@ public class PublishKafkaRecordIT extends
AbstractPublishKafkaIT {
private static final String TEST_RESOURCE =
"org/apache/nifi/kafka/processors/publish/ff.json";
private static final String KEY_ATTRIBUTE_KEY = "keyAttribute";
+ private static final String KEY_ATTRIBUTE_KEY_EL = "${keyAttribute}";
private static final String KEY_ATTRIBUTE_VALUE = "keyAttributeValue";
private static final int TEST_RECORD_COUNT = 3;
@@ -59,7 +60,7 @@ public class PublishKafkaRecordIT extends
AbstractPublishKafkaIT {
addRecordWriterService(runner);
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
- runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY);
+ runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY_EL);
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
final Map<String, String> attributes = new HashMap<>();
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTombstoneIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTombstoneIT.java
index ad651b1051..7a35de913f 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTombstoneIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTombstoneIT.java
@@ -41,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@TestMethodOrder(MethodOrderer.MethodName.class)
public class PublishKafkaTombstoneIT extends AbstractPublishKafkaIT {
private static final String TEST_KEY_ATTRIBUTE = "my-key";
+ private static final String TEST_KEY_ATTRIBUTE_EL = "${my-key}";
private static final String TEST_KEY_VALUE = "some-key-value";
private static final byte[] TEST_RECORD_VALUE = new byte[0];
@@ -51,7 +52,7 @@ public class PublishKafkaTombstoneIT extends
AbstractPublishKafkaIT {
runner.setProperty(PublishKafka.CONNECTION_SERVICE,
addKafkaConnectionService(runner));
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
- runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE);
+ runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE_EL);
final Map<String, String> attributes = new HashMap<>();
attributes.put("a1", "valueA1");
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
index d504cf0b96..1777bd4504 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
@@ -167,7 +167,7 @@ public class PublishKafka extends AbstractProcessor
implements KafkaPublishCompo
static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new
PropertyDescriptor.Builder()
.name("Transactional ID Prefix")
.description("Specifies the KafkaProducer config transactional.id
will be a generated UUID and will be prefixed with the configured string.")
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.dependsOn(TRANSACTIONS_ENABLED, "true")
.required(false)
@@ -249,6 +249,7 @@ public class PublishKafka extends AbstractProcessor
implements KafkaPublishCompo
.description("For any attribute that is added as a Kafka Record
Header, this property indicates the Character Encoding to use for serializing
the headers.")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(StandardCharsets.UTF_8.displayName())
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.dependsOn(ATTRIBUTE_HEADER_PATTERN)
.build();
@@ -559,13 +560,13 @@ public class PublishKafka extends AbstractProcessor
implements KafkaPublishCompo
final RecordSetWriterFactory keyWriterFactory =
context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
- final String kafkaKeyAttribute =
context.getProperty(KAFKA_KEY).getValue();
+ final PropertyValue kafkaKeyAttribute = context.getProperty(KAFKA_KEY);
final String keyAttributeEncoding =
context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String messageKeyField = publishStrategy ==
PublishStrategy.USE_VALUE
?
context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue()
: null;
final KeyFactory keyFactory = messageKeyField != null
? new MessageKeyFactory(flowFile, messageKeyField,
keyWriterFactory, getLogger())
- : new AttributeKeyFactory(kafkaKeyAttribute,
keyAttributeEncoding);
+ : new AttributeKeyFactory(flowFile, kafkaKeyAttribute,
keyAttributeEncoding);
if (readerFactory != null && writerFactory != null) {
return switch (publishStrategy) {
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactory.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactory.java
index d1a64258ed..c3673bd633 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactory.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactory.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.kafka.processors.producer.key;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.serialization.record.Record;
@@ -25,18 +27,21 @@ import java.util.Map;
import java.util.Optional;
public class AttributeKeyFactory implements KeyFactory {
- private final String keyAttribute;
+ private final FlowFile flowFile;
+ private final PropertyValue keyAttribute;
private final String keyAttributeEncoding;
- public AttributeKeyFactory(final String keyAttribute,
- final String keyAttributeEncoding) {
- this.keyAttribute = (keyAttribute == null) ?
KafkaFlowFileAttribute.KAFKA_KEY : keyAttribute;
+ public AttributeKeyFactory(final FlowFile flowFile, final PropertyValue
keyAttribute, final String keyAttributeEncoding) {
+ this.flowFile = flowFile;
+ this.keyAttribute = keyAttribute;
this.keyAttributeEncoding =
Optional.ofNullable(keyAttributeEncoding).orElse(StandardCharsets.UTF_8.name());
}
@Override
public byte[] getKey(final Map<String, String> attributes, final Record
record) throws UnsupportedEncodingException {
- final String keyAttributeValue = attributes.get(keyAttribute);
+ final String keyAttributeValue = keyAttribute.isSet()
+ ?
keyAttribute.evaluateAttributeExpressions(flowFile).getValue()
+ : attributes.get(KafkaFlowFileAttribute.KAFKA_KEY);
return (keyAttributeValue == null) ? null :
keyAttributeValue.getBytes(keyAttributeEncoding);
}
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactoryTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactoryTest.java
index 72c3dbcb3b..5b14a10c85 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactoryTest.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/producer/key/AttributeKeyFactoryTest.java
@@ -16,12 +16,15 @@
*/
package org.apache.nifi.kafka.processors.producer.key;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyValue;
import org.junit.jupiter.api.Test;
import java.io.UnsupportedEncodingException;
@@ -31,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public class AttributeKeyFactoryTest {
@@ -39,8 +43,9 @@ public class AttributeKeyFactoryTest {
void testNullKeyAttribute() throws UnsupportedEncodingException {
final Map<String, String> attributes = new HashMap<>();
final Record record = fabricateRecord();
+ final PropertyValue propertyValue = new MockPropertyValue(null);
- final AttributeKeyFactory attributeKeyFactory = new
AttributeKeyFactory(null, null);
+ final AttributeKeyFactory attributeKeyFactory = new
AttributeKeyFactory(null, propertyValue, null);
assertNull(attributeKeyFactory.getKey(attributes, record));
}
@@ -48,9 +53,12 @@ public class AttributeKeyFactoryTest {
void testNullKeyAttributeValue() throws UnsupportedEncodingException {
final Map<String, String> attributes = new HashMap<>();
final Record record = fabricateRecord();
+ final MockFlowFile flowFile = new MockFlowFile(1L);
+ flowFile.putAttributes(attributes);
+ final PropertyValue propertyValue = new MockPropertyValue("${A}");
- final AttributeKeyFactory attributeKeyFactory = new
AttributeKeyFactory("A", null);
- assertNull(attributeKeyFactory.getKey(attributes, record));
+ final AttributeKeyFactory attributeKeyFactory = new
AttributeKeyFactory(flowFile, propertyValue, null);
+ assertEquals(0, attributeKeyFactory.getKey(attributes, record).length);
}
@Test
@@ -59,8 +67,11 @@ public class AttributeKeyFactoryTest {
attributes.put("A", "valueA");
attributes.put("B", "valueB");
final Record record = fabricateRecord();
+ final MockFlowFile flowFile = new MockFlowFile(1L);
+ flowFile.putAttributes(attributes);
+ final PropertyValue propertyValue = new MockPropertyValue("${A}");
- final AttributeKeyFactory attributeKeyFactory = new
AttributeKeyFactory("A", null);
+ final AttributeKeyFactory attributeKeyFactory = new
AttributeKeyFactory(flowFile, propertyValue, null);
assertArrayEquals("valueA".getBytes(StandardCharsets.UTF_8),
attributeKeyFactory.getKey(attributes, record));
}