http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java index 4ceb12e..6039aa0 100644 --- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java +++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java @@ -41,6 +41,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -88,7 +89,7 @@ public class ExtractHL7Attributes extends AbstractProcessor { .displayName("Character Encoding") .description("The Character Encoding that is used to encode the HL7 data") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .defaultValue("UTF-8") .build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java index ba90370..6a8df89 100644 --- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java +++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java @@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hl7.hapi.HapiMessage; import org.apache.nifi.hl7.model.HL7Message; @@ -81,7 +82,7 @@ public class RouteHL7 extends AbstractProcessor { .name("Character Encoding") .description("The Character Encoding that is used to encode the HL7 data") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .defaultValue("UTF-8") .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java index 8ad6f8a..902f215 100644 --- a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java +++ b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java @@ -20,6 +20,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -68,7 +69,7 @@ public abstract class AbstractHTMLProcessor extends AbstractProcessor { " when an attribute value is extracted from a HTML element.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor CSS_SELECTOR = new PropertyDescriptor @@ -76,7 +77,7 @@ public abstract class AbstractHTMLProcessor extends AbstractProcessor { .description("CSS selector syntax string used to extract the desired HTML element(s).") .required(true) .addValidator(CSS_SELECTOR_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor HTML_CHARSET = new PropertyDescriptor http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java index 713fabd..8346f03 100644 --- a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java +++ b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -77,7 +78,7 @@ public class GetHTMLElement .description("Prepends the specified value to the resulting Element") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor APPEND_ELEMENT_VALUE = new PropertyDescriptor @@ -85,7 +86,7 @@ public class GetHTMLElement .description("Appends the specified value to the resulting Element") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor ATTRIBUTE_KEY = new PropertyDescriptor @@ -97,10 +98,9 @@ public class GetHTMLElement " an absolute URL form using the specified base URL.")) .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); - public static final PropertyDescriptor OUTPUT_TYPE = new PropertyDescriptor.Builder() .name("Output Type") .description("Controls the type of DOM value that is retrieved from the HTML element.") http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java index 7f6e12e..24f9741 100644 --- a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java +++ b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java @@ -24,6 +24,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -79,7 +80,7 @@ public class ModifyHTMLElement extends AbstractHTMLProcessor { .description("Value to update the found HTML elements with") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor ATTRIBUTE_KEY = new PropertyDescriptor @@ -88,7 +89,7 @@ public class ModifyHTMLElement extends AbstractHTMLProcessor { " which attribute on the selected element will be modified with the new value.")) .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); private List<PropertyDescriptor> descriptors; http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java index bc9b70c..0d112cb 100644 --- a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java +++ b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java @@ -22,6 +22,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -78,7 +79,7 @@ public class PutHTMLElement extends AbstractHTMLProcessor { "encoded in the resulting output.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); private List<PropertyDescriptor> descriptors; @@ -133,7 +134,7 @@ public class PutHTMLElement extends AbstractHTMLProcessor { final Elements eles; try { doc = parseHTMLDocumentFromFlowfile(flowFile, context, session); - eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions().getValue()); + eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue()); } catch (Exception ex) { getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex.toString(), REL_INVALID_HTML.getName()}, ex); session.transfer(flowFile, REL_INVALID_HTML); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java index ca6136c..2ea7343 100644 --- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.ignite.IgniteCache; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; @@ -56,7 +57,7 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess "for determining Ignite cache key for the Flow File content") .required(true) .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); /** http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java index 13838de..cedde21 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -44,7 +45,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { .description("Specifies the character set of the document data.") .required(true) .defaultValue("UTF-8") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); @@ -54,7 +55,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { .description("InfluxDB URL to connect to. Eg: http://influxdb:8086") .defaultValue("http://localhost:8086") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.URL_VALIDATOR) .build(); @@ -72,7 +73,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { .displayName("Database Name") .description("InfluxDB database to connect to") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -81,7 +82,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { .displayName("Username") .required(false) .description("Username for accessing InfluxDB") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -90,7 +91,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { .displayName("Password") .required(false) .description("Password for user") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .sensitive(true) .build(); @@ -99,7 +100,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { .name("influxdb-max-records-size") .displayName("Max size of records") .description("Maximum size of records allowed to be posted in one batch") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("1 MB") .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java index ed45025..f507768 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java @@ -27,6 +27,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -67,7 +68,7 @@ public class PutInfluxDB extends AbstractInfluxDBProcessor { .description("InfluxDB consistency level") .required(true) .defaultValue(CONSISTENCY_LEVEL_ONE.getValue()) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .allowableValues(CONSISTENCY_LEVEL_ONE, CONSISTENCY_LEVEL_ANY, CONSISTENCY_LEVEL_ALL, CONSISTENCY_LEVEL_QUORUM) .build(); @@ -77,7 +78,7 @@ public class PutInfluxDB extends AbstractInfluxDBProcessor { .description("Retention policy for the saving the records") .defaultValue("autogen") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java index 5c822be..e9c24d5 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java @@ -39,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.ssl.SSLContextService; @@ -88,7 +89,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl + "class (i.e., org.apache.activemq.ActiveMQConnectionFactory)") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new PropertyDescriptor.Builder() .name(CF_LIB) @@ -98,7 +99,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl + "ConnectionFactory implementation.") .addValidator(new ClientLibValidator()) .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); // ConnectionFactory specific properties @@ -109,7 +110,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl + "'tcp://myhost:61616' for ActiveMQ or 'myhost:1414' for IBM MQ") .addValidator(new NonEmptyBrokerURIValidator()) .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() @@ -133,7 +134,9 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl return new PropertyDescriptor.Builder() .description("Specifies the value for '" + propertyDescriptorName + "' property to be set on the provided ConnectionFactory implementation.") - .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) .build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index 1ac468c..1b8a8f3 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -28,6 +28,7 @@ import javax.jms.ConnectionFactory; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.processor.AbstractProcessor; @@ -73,7 +74,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess .description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic' or 'myTopic').") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder() .name("Destination Type") @@ -90,7 +91,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess "Please see JMS spec for further details") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder() .name("Session Cache size") @@ -114,7 +115,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess .required(true) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .defaultValue(Charset.defaultCharset().name()) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index e3e9a75..b7103ff 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -36,6 +36,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback; @@ -106,7 +107,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { .description("If destination is Topic if present then make it the consumer durable. " + "@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createDurableConsumer-javax.jms.Topic-java.lang.String-") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("false") .allowableValues("true", "false") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -116,7 +117,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { .description("If destination is Topic if present then make it the consumer shared. " + "@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createSharedConsumer-javax.jms.Topic-java.lang.String-") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("false") .allowableValues("true", "false") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -126,7 +127,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { .description("The name of the subscription to use if destination is Topic and is shared or durable.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index c555fd7..f58b9cf 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -128,7 +128,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> { break; } processSession.transfer(flowFile, REL_SUCCESS); - processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); + processSession.getProvenanceReporter().send(flowFile, destinationName); } catch (Exception e) { processSession.transfer(flowFile, REL_FAILURE); this.getLogger().error("Failed while sending message to JMS via " + publisher, e); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java index 3c0bddc..f0026a4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java @@ -44,6 +44,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -87,7 +88,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor { .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() @@ -104,7 +105,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor { .displayName("Record Reader") .description("The Record Reader to use for incoming FlowFiles") .identifiesControllerService(RecordReaderFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -113,7 +114,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor { .displayName("Record Writer") .description("The Record Writer to use in order to serialize the data before sending to Kafka") .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -123,7 +124,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor { .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java index f6425de..cab15de 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java @@ -43,6 +43,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -87,7 +88,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() @@ -105,7 +106,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() @@ -132,7 +133,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { .displayName("Message Demarcator") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains " + "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use " + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received " http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index d835607..5b65b0d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; @@ -76,7 +77,7 @@ final class KafkaProcessorUtils { .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") .required(true) .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("localhost:9092") .build(); static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() @@ -84,7 +85,7 @@ final class KafkaProcessorUtils { .displayName("Security Protocol") .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) .defaultValue(SEC_PLAINTEXT.getValue()) .build(); @@ -96,7 +97,7 @@ final class KafkaProcessorUtils { + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() .name("sasl.kerberos.principal") @@ -105,7 +106,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() .name("sasl.kerberos.keytab") @@ -114,7 +115,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl.context.service") http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java index b86f60b..e670c7d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java @@ -43,6 +43,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -104,7 +105,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { .description("The name of the Kafka Topic to publish to.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() @@ -112,7 +113,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { .displayName("Record Reader") .description("The Record Reader to use for incoming FlowFiles") .identifiesControllerService(RecordReaderFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -121,7 +122,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { .displayName("Record Writer") .description("The Record Writer to use in order to serialize the data before sending to Kafka") .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -130,7 +131,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { .displayName("Message Key Field") .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); @@ -139,7 +140,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { .displayName("Delivery Guarantee") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); @@ -151,7 +152,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("5 sec") .build(); @@ -161,7 +162,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .defaultValue("5 secs") .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java index c9d0f37..5a2716a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java @@ -45,6 +45,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -100,7 +101,7 @@ public class PublishKafka_0_10 extends AbstractProcessor { .description("The name of the Kafka Topic to publish to.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() @@ -108,7 +109,7 @@ public class PublishKafka_0_10 extends AbstractProcessor { .displayName("Delivery Guarantee") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); @@ -120,7 +121,7 @@ public class PublishKafka_0_10 extends AbstractProcessor { + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("5 sec") .build(); @@ -130,7 +131,7 @@ public class PublishKafka_0_10 extends AbstractProcessor { .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .defaultValue("5 secs") .build(); @@ -154,7 +155,7 @@ public class PublishKafka_0_10 extends AbstractProcessor { + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() @@ -171,7 +172,7 @@ public class PublishKafka_0_10 extends AbstractProcessor { .displayName("Message Demarcator") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java index c4b0920..2b202df 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java @@ -45,6 +45,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -89,7 +90,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor { .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() @@ -106,7 +107,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor { .displayName("Record Reader") .description("The Record Reader to use for incoming FlowFiles") .identifiesControllerService(RecordReaderFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -115,7 +116,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor { .displayName("Record Writer") .description("The Record Writer to use in order to serialize the data before sending to Kafka") .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -125,7 +126,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor { .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() @@ -167,7 +168,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor { + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If " + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait " + "for the producer to finish its entire transaction instead of pulling as the messages become available.") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) @@ -191,7 +192,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor { + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling " + "the messages together efficiently.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(false) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java index a8d0457..29c9ee3 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java @@ -44,6 +44,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -88,7 +89,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor { .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() @@ -106,7 +107,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor { .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() @@ -133,7 +134,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor { .displayName("Message Demarcator") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains " + "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use " + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received " @@ -150,7 +151,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor { + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling " + "the messages together efficiently.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(false) .build(); @@ -184,7 +185,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor { + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If " + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait " + "for the producer to finish its entire transaction instead of pulling as the messages become available.") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index e88f3da..fa7ebcb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; @@ -76,7 +77,7 @@ final class KafkaProcessorUtils { .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") .required(true) .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("localhost:9092") .build(); static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() @@ -84,7 +85,7 @@ final class KafkaProcessorUtils { .displayName("Security Protocol") .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) .defaultValue(SEC_PLAINTEXT.getValue()) .build(); @@ -96,7 +97,7 @@ final class KafkaProcessorUtils { + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() .name("sasl.kerberos.principal") @@ -105,7 +106,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() .name("sasl.kerberos.keytab") @@ -114,7 +115,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl.context.service") http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java index 1521bfa..8bbec17 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java @@ -46,6 +46,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -107,7 +108,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { .description("The name of the Kafka Topic to publish to.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() @@ -115,7 +116,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { .displayName("Record Reader") .description("The Record Reader to use for incoming FlowFiles") .identifiesControllerService(RecordReaderFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -124,7 +125,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { .displayName("Record Writer") .description("The Record Writer to use in order to serialize the data before sending to Kafka") .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -133,7 +134,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { .displayName("Message Key Field") .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); @@ -142,7 +143,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { .displayName("Delivery Guarantee") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); @@ -154,7 +155,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("5 sec") .build(); @@ -164,7 +165,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .defaultValue("5 secs") .build(); @@ -204,7 +205,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " + "If not specified, no FlowFile attributes will be added as headers.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(false) .build(); static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder() @@ -214,7 +215,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java index 130954a..5efe913 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java @@ -47,6 +47,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -102,7 +103,7 @@ public class PublishKafka_0_11 extends AbstractProcessor { .description("The name of the Kafka Topic to publish to.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() @@ -110,7 +111,7 @@ public class PublishKafka_0_11 extends AbstractProcessor { .displayName("Delivery Guarantee") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); @@ -122,7 +123,7 @@ public class PublishKafka_0_11 extends AbstractProcessor { + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("5 sec") .build(); @@ -132,7 +133,7 @@ public class PublishKafka_0_11 extends AbstractProcessor { .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .defaultValue("5 secs") .build(); @@ -156,7 +157,7 @@ public class PublishKafka_0_11 extends AbstractProcessor { + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() @@ -173,7 +174,7 @@ public class PublishKafka_0_11 extends AbstractProcessor { .displayName("Message Demarcator") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " @@ -206,7 +207,7 @@ public class PublishKafka_0_11 extends AbstractProcessor { + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " + "If not specified, no FlowFile attributes will be added as headers.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(false) .build(); static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder() @@ -216,7 +217,7 @@ public class PublishKafka_0_11 extends AbstractProcessor { + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index b1eadcf..e01afdd 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -50,6 +50,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -93,14 +94,14 @@ public class GetKafka extends AbstractProcessor { + " combinations. For example, host1:2181,host2:2181,host3:2188") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() .name("Topic Name") .description("The Kafka Topic to pull messages from") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder() .name("Zookeeper Commit Frequency") @@ -108,7 +109,7 @@ public class GetKafka extends AbstractProcessor { + " result in better overall performance but can result in more data duplication if a NiFi node is lost") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .defaultValue("60 secs") .build(); public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder() @@ -116,7 +117,7 @@ public class GetKafka extends AbstractProcessor { .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .defaultValue("30 secs") .build(); public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder() @@ -124,7 +125,7 @@ public class GetKafka extends AbstractProcessor { .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .defaultValue("30 secs") .build(); public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() @@ -134,7 +135,7 @@ public class GetKafka extends AbstractProcessor { + "If the messages from Kafka should not be concatenated together, leave this value at 1.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .defaultValue("1") .build(); public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() @@ -144,7 +145,7 @@ public class GetKafka extends AbstractProcessor { + "this value will be placed in between them.") .required(true) .addValidator(Validator.VALID) // accept anything as a demarcator, including empty string - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .defaultValue("\\n") .build(); @@ -153,14 +154,14 @@ public class GetKafka extends AbstractProcessor { .description("Client Name to use when communicating with Kafka") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() .name("Group ID") .description("A Group ID is used to identify consumers that are within the same consumer group") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index ab24e57..be93736 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -38,6 +38,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -112,14 +113,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") .required(true) .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() .name("Topic Name") .description("The Kafka Topic of interest") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); /** * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. @@ -137,7 +138,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { + "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, " + "then the FlowFile will be routed to failure relationship.") .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() @@ -145,12 +146,12 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { .description("The Key to use for the Message") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() .name("Delivery Guarantee") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka").required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); @@ -165,14 +166,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { + "sent to the 'failure' relationship.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() .name("Max Buffer Size") .description("The maximum amount of data to buffer in memory before sending to Kafka") .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .defaultValue("5 MB") .build(); static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder() @@ -187,14 +188,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .defaultValue("30 secs").build(); public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() .name("Client Name") .description("Client Name to use when communicating with Kafka") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() .name("Async Batch Size")