This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new af6039a NIFI-6045: Added support EL support to dyanmic properties for
Kafka 2.0 processors
af6039a is described below
commit af6039a5b06af70fe4c1c14abcba5f7b381535ee
Author: Corey Fritz <[email protected]>
AuthorDate: Thu Feb 7 13:48:19 2019 -0500
NIFI-6045: Added support EL support to dyanmic properties for Kafka 2.0
processors
This closes #3316
Signed-off-by: Mike Thomsen <[email protected]>
---
.../org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java | 8 ++++++--
.../nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java | 4 +++-
.../org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java | 4 +++-
3 files changed, 12 insertions(+), 4 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index d651061..758ac87 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -73,7 +73,8 @@ import static
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
@DynamicProperty(name = "The name of a Kafka configuration property.", value =
"The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka
configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was
already set, its value will be ignored and WARN message logged."
- + " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration. ")
+ + " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration. ",
+ expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
public class ConsumeKafka_2_0 extends AbstractProcessor {
static final AllowableValue OFFSET_EARLIEST = new
AllowableValue("earliest", "earliest", "Automatically reset the offset to the
earliest offset");
@@ -264,7 +265,10 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" +
propertyDescriptorName + "' Kafka Configuration.")
- .name(propertyDescriptorName).addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true)
+ .name(propertyDescriptorName)
+ .addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
+ .dynamic(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index 2dde669..9182084 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -72,7 +72,8 @@ import java.util.regex.Pattern;
@DynamicProperty(name = "The name of a Kafka configuration property.", value =
"The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration
after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already
set, its value will be ignored and WARN message logged."
- + " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration. ")
+ + " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration. ",
+ expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@WritesAttribute(attribute = "msg.count", description = "The number of
messages that were sent to Kafka for this FlowFile. This attribute is added
only to "
+ "FlowFiles that are routed to success.")
@SeeAlso({PublishKafka_2_0.class, ConsumeKafka_2_0.class,
ConsumeKafkaRecord_2_0.class})
@@ -292,6 +293,7 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
.name(propertyDescriptorName)
.addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
.dynamic(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index b641eb4..251c10e 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -67,7 +67,8 @@ import java.util.regex.Pattern;
@DynamicProperty(name = "The name of a Kafka configuration property.", value =
"The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration
after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was
already set, its value will be ignored and WARN message logged."
- + " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration. ")
+ + " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration. ",
+ expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@WritesAttribute(attribute = "msg.count", description = "The number of
messages that were sent to Kafka for this FlowFile. This attribute is added
only to "
+ "FlowFiles that are routed to success. If the <Message Demarcator>
Property is not set, this will always be 1, but if the Property is set, it may "
+ "be greater than 1.")
@@ -288,6 +289,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
.name(propertyDescriptorName)
.addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
.dynamic(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
}