Repository: nifi Updated Branches: refs/heads/master b1901d5fe -> afd2b04af
NIFI-3852 Add expression language support to Cassandra processors Remove unused import Signed-off-by: Matt Burgess <[email protected]> This closes #1770 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/afd2b04a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/afd2b04a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/afd2b04a Branch: refs/heads/master Commit: afd2b04afde2e843f46718ede01e3f8e31d2780c Parents: b1901d5 Author: Tim Reardon <[email protected]> Authored: Tue May 9 12:39:18 2017 -0400 Committer: Matt Burgess <[email protected]> Committed: Fri May 12 13:52:07 2017 -0400 ---------------------------------------------------------------------- .../cassandra/AbstractCassandraProcessor.java | 20 ++++--- .../processors/cassandra/PutCassandraQL.java | 5 +- .../processors/cassandra/QueryCassandra.java | 8 ++- .../AbstractCassandraProcessorTest.java | 13 ++++- .../cassandra/PutCassandraQLTest.java | 59 ++++++++++++++++++++ .../cassandra/QueryCassandraTest.java | 47 ++++++++++++++++ 6 files changed, 137 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java index 1dc1809..4b69e65 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java @@ -47,7 +47,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -66,7 +65,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { + "comma-separated and in hostname:port format. Example node1:port,node2:port,...." + " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .build(); @@ -75,6 +74,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { .description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to " + "include the keyspace name before any table reference.") .required(false) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -100,6 +100,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { .name("Username") .description("Username to access the Cassandra cluster") .required(false) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -108,6 +109,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { .description("Password to access the Cassandra cluster") .required(false) .sensitive(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -123,6 +125,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { .name("Character Set") .description("Specifies the character set of the record data.") .required(true) + .expressionLanguageSupported(true) .defaultValue("UTF-8") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); @@ -150,8 +153,9 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { Set<ValidationResult> results = new HashSet<>(); // Ensure that if username or password is set, then the other is too - Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties(); - if (StringUtils.isEmpty(propertyMap.get(USERNAME)) != StringUtils.isEmpty(propertyMap.get(PASSWORD))) { + String userName = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + if (StringUtils.isEmpty(userName) != StringUtils.isEmpty(password)) { results.add(new ValidationResult.Builder().valid(false).explanation( "If username or password is specified, then the other must be specified as well").build()); } @@ -162,7 +166,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { protected void connectToCassandra(ProcessContext context) { if (cluster.get() == null) { ComponentLog log = getLogger(); - final String contactPointList = context.getProperty(CONTACT_POINTS).getValue(); + final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue(); final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue(); List<InetSocketAddress> contactPoints = getContactPoints(contactPointList); @@ -190,8 +194,8 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { } final String username, password; - PropertyValue usernameProperty = context.getProperty(USERNAME); - PropertyValue passwordProperty = context.getProperty(PASSWORD); + PropertyValue usernameProperty = context.getProperty(USERNAME).evaluateAttributeExpressions(); + PropertyValue passwordProperty = context.getProperty(PASSWORD).evaluateAttributeExpressions(); if (usernameProperty != null && passwordProperty != null) { username = usernameProperty.getValue(); @@ -203,7 +207,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { // Create the cluster and connect to it Cluster newCluster = createCluster(contactPoints, sslContext, username, password); - PropertyValue keyspaceProperty = context.getProperty(KEYSPACE); + PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions(); final Session newSession; if (keyspaceProperty != null) { newSession = newCluster.connect(keyspaceProperty.getValue()); http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java index 2f680a6..81fd6c8 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java @@ -95,6 +95,7 @@ public class PutCassandraQL extends AbstractCassandraProcessor { + "Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ") .defaultValue("0 seconds") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); @@ -177,8 +178,8 @@ public class PutCassandraQL extends AbstractCassandraProcessor { } final long startNanos = System.nanoTime(); - final long statementTimeout = context.getProperty(STATEMENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + final long statementTimeout = context.getProperty(STATEMENT_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java index ca7f690..7387334 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java @@ -103,6 +103,7 @@ public class QueryCassandra extends AbstractCassandraProcessor { + "Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ") .defaultValue("0 seconds") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); @@ -112,6 +113,7 @@ public class QueryCassandra extends AbstractCassandraProcessor { + "and means there is no limit.") .defaultValue("0") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.INTEGER_VALIDATOR) .build(); @@ -178,7 +180,7 @@ public class QueryCassandra extends AbstractCassandraProcessor { ComponentLog log = getLogger(); try { connectToCassandra(context); - final int fetchSize = context.getProperty(FETCH_SIZE).asInteger(); + final int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); if (fetchSize > 0) { synchronized (cluster.get()) { cluster.get().getConfiguration().getQueryOptions().setFetchSize(fetchSize); @@ -214,9 +216,9 @@ public class QueryCassandra extends AbstractCassandraProcessor { final ComponentLog logger = getLogger(); final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); - final long queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); final StopWatch stopWatch = new StopWatch(true); if (fileToProcess == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java index 3b0e273..aeb8f59 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java @@ -88,6 +88,16 @@ public class AbstractCassandraProcessorTest { testRunner.assertValid(); } + @Test + public void testCustomValidateEL() throws Exception { + testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${host}"); + testRunner.setProperty(AbstractCassandraProcessor.KEYSPACE, "${keyspace}"); + testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}"); + testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${password}"); + testRunner.setProperty(AbstractCassandraProcessor.CHARSET, "${charset}"); + testRunner.assertValid(); + } + @SuppressWarnings("unchecked") @Test public void testGetCassandraObject() throws Exception { @@ -247,7 +257,6 @@ public class AbstractCassandraProcessorTest { assertNotNull(processor.getCluster()); } - /** * Provides a stubbed processor instance for testing */ @@ -255,7 +264,7 @@ public class AbstractCassandraProcessorTest { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return Arrays.asList(CONTACT_POINTS, USERNAME, PASSWORD, CONSISTENCY_LEVEL); + return Arrays.asList(CONTACT_POINTS, KEYSPACE, USERNAME, PASSWORD, CONSISTENCY_LEVEL, CHARSET); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java index b3e4fe2..de66235 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java @@ -77,6 +77,17 @@ public class PutCassandraQLTest { } @Test + public void testProcessorELConfigValidity() { + testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${hosts}"); + testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}"); + testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}"); + testRunner.setProperty(AbstractCassandraProcessor.CHARSET, "${charset}"); + testRunner.setProperty(PutCassandraQL.STATEMENT_TIMEOUT, "${timeout}"); + + testRunner.assertValid(); + } + + @Test public void testProcessorHappyPath() { setUpStandardTestConfig(); @@ -113,6 +124,54 @@ public class PutCassandraQLTest { } @Test + public void testProcessorHappyPathELConfig() { + testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${hosts}"); + testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}"); + testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}"); + testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE"); + testRunner.setProperty(AbstractCassandraProcessor.CHARSET, "${charset}"); + testRunner.setProperty(PutCassandraQL.STATEMENT_TIMEOUT, "${timeout}"); + testRunner.assertValid(); + + testRunner.setVariable("hosts", "localhost:9042"); + testRunner.setVariable("user", "username"); + testRunner.setVariable("pass", "password"); + testRunner.setVariable("charset", "UTF-8"); + testRunner.setVariable("timeout", "30 sec"); + + testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?", + new HashMap<String, String>() { + { + put("cql.args.1.type", "int"); + put("cql.args.1.value", "1"); + put("cql.args.2.type", "text"); + put("cql.args.2.value", "Joe"); + put("cql.args.3.type", "text"); + // No value for arg 3 to test setNull + put("cql.args.4.type", "map<text,text>"); + put("cql.args.4.value", "{'a':'Hello', 'b':'World'}"); + put("cql.args.5.type", "list<boolean>"); + put("cql.args.5.value", "[true,false,true]"); + put("cql.args.6.type", "set<double>"); + put("cql.args.6.value", "{1.0, 2.0}"); + put("cql.args.7.type", "bigint"); + put("cql.args.7.value", "20000000"); + put("cql.args.8.type", "float"); + put("cql.args.8.value", "1.0"); + put("cql.args.9.type", "blob"); + put("cql.args.9.value", "0xDEADBEEF"); + put("cql.args.10.type", "timestamp"); + put("cql.args.10.value", "2016-07-01T15:21:05Z"); + + } + }); + + testRunner.run(1, true, true); + testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 1); + testRunner.clearTransferState(); + } + + @Test public void testProcessorBadTimestamp() { setUpStandardTestConfig(); processor.setExceptionToThrow( http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java index 023f239..5cd54e9 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java @@ -79,6 +79,16 @@ public class QueryCassandraTest { } @Test + public void testProcessorELConfigValid() { + testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE"); + testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${hosts}"); + testRunner.setProperty(QueryCassandra.CQL_SELECT_QUERY, "${query}"); + testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}"); + testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}"); + testRunner.assertValid(); + } + + @Test public void testProcessorNoInputFlowFileAndExceptions() { setUpStandardProcessorConfig(); @@ -140,6 +150,43 @@ public class QueryCassandraTest { } @Test + public void testProcessorELConfigJsonOutput() { + testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "${hosts}"); + testRunner.setProperty(QueryCassandra.CQL_SELECT_QUERY, "${query}"); + testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}"); + testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}"); + testRunner.setProperty(AbstractCassandraProcessor.CHARSET, "${charset}"); + testRunner.setProperty(QueryCassandra.QUERY_TIMEOUT, "${timeout}"); + testRunner.setProperty(QueryCassandra.FETCH_SIZE, "${fetch}"); + testRunner.setIncomingConnection(false); + testRunner.assertValid(); + + testRunner.setVariable("hosts", "localhost:9042"); + testRunner.setVariable("user", "username"); + testRunner.setVariable("pass", "password"); + testRunner.setVariable("charset", "UTF-8"); + testRunner.setVariable("timeout", "30 sec"); + testRunner.setVariable("fetch", "0"); + + // Test JSON output + testRunner.setProperty(QueryCassandra.OUTPUT_FORMAT, QueryCassandra.JSON_FORMAT); + testRunner.run(1, true, true); + testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_SUCCESS, 1); + List<MockFlowFile> files = testRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS); + assertNotNull(files); + assertEquals("One file should be transferred to success", 1, files.size()); + assertEquals("{\"results\":[{\"user_id\":\"user1\",\"first_name\":\"Joe\",\"last_name\":\"Smith\"," + + "\"emails\":[\"[email protected]\"],\"top_places\":[\"New York, NY\",\"Santa Clara, CA\"]," + + "\"todo\":{\"2016-01-03 05:00:00+0000\":\"Set my alarm for a month from now\"}," + + "\"registered\":\"false\",\"scale\":1.0,\"metric\":2.0}," + + "{\"user_id\":\"user2\",\"first_name\":\"Mary\",\"last_name\":\"Jones\"," + + "\"emails\":[\"[email protected]\"],\"top_places\":[\"Orlando, FL\"]," + + "\"todo\":{\"2016-02-03 05:00:00+0000\":\"Get milk and bread\"}," + + "\"registered\":\"true\",\"scale\":3.0,\"metric\":4.0}]}", + new String(files.get(0).toByteArray())); + } + + @Test public void testProcessorJsonOutputWithQueryTimeout() { setUpStandardProcessorConfig(); testRunner.setProperty(QueryCassandra.QUERY_TIMEOUT, "5 sec");
