Repository: nifi
Updated Branches:
  refs/heads/master b1901d5fe -> afd2b04af


NIFI-3852 Add expression language support to Cassandra processors

Remove unused import

Signed-off-by: Matt Burgess <[email protected]>

This closes #1770


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/afd2b04a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/afd2b04a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/afd2b04a

Branch: refs/heads/master
Commit: afd2b04afde2e843f46718ede01e3f8e31d2780c
Parents: b1901d5
Author: Tim Reardon <[email protected]>
Authored: Tue May 9 12:39:18 2017 -0400
Committer: Matt Burgess <[email protected]>
Committed: Fri May 12 13:52:07 2017 -0400

----------------------------------------------------------------------
 .../cassandra/AbstractCassandraProcessor.java   | 20 ++++---
 .../processors/cassandra/PutCassandraQL.java    |  5 +-
 .../processors/cassandra/QueryCassandra.java    |  8 ++-
 .../AbstractCassandraProcessorTest.java         | 13 ++++-
 .../cassandra/PutCassandraQLTest.java           | 59 ++++++++++++++++++++
 .../cassandra/QueryCassandraTest.java           | 47 ++++++++++++++++
 6 files changed, 137 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
index 1dc1809..4b69e65 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
@@ -47,7 +47,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -66,7 +65,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
                     + "comma-separated and in hostname:port format. Example 
node1:port,node2:port,...."
                     + " The default client port for Cassandra is 9042, but the 
port(s) must be explicitly specified.")
             .required(true)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
             .build();
 
@@ -75,6 +74,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
             .description("The Cassandra Keyspace to connect to. If no keyspace 
is specified, the query will need to "
                     + "include the keyspace name before any table reference.")
             .required(false)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -100,6 +100,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
             .name("Username")
             .description("Username to access the Cassandra cluster")
             .required(false)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -108,6 +109,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
             .description("Password to access the Cassandra cluster")
             .required(false)
             .sensitive(true)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -123,6 +125,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
             .name("Character Set")
             .description("Specifies the character set of the record data.")
             .required(true)
+            .expressionLanguageSupported(true)
             .defaultValue("UTF-8")
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .build();
@@ -150,8 +153,9 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
         Set<ValidationResult> results = new HashSet<>();
 
         // Ensure that if username or password is set, then the other is too
-        Map<PropertyDescriptor, String> propertyMap = 
validationContext.getProperties();
-        if (StringUtils.isEmpty(propertyMap.get(USERNAME)) != 
StringUtils.isEmpty(propertyMap.get(PASSWORD))) {
+        String userName = 
validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        String password = 
validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        if (StringUtils.isEmpty(userName) != StringUtils.isEmpty(password)) {
             results.add(new 
ValidationResult.Builder().valid(false).explanation(
                     "If username or password is specified, then the other must 
be specified as well").build());
         }
@@ -162,7 +166,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
     protected void connectToCassandra(ProcessContext context) {
         if (cluster.get() == null) {
             ComponentLog log = getLogger();
-            final String contactPointList = 
context.getProperty(CONTACT_POINTS).getValue();
+            final String contactPointList = 
context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
             final String consistencyLevel = 
context.getProperty(CONSISTENCY_LEVEL).getValue();
             List<InetSocketAddress> contactPoints = 
getContactPoints(contactPointList);
 
@@ -190,8 +194,8 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
             }
 
             final String username, password;
-            PropertyValue usernameProperty = context.getProperty(USERNAME);
-            PropertyValue passwordProperty = context.getProperty(PASSWORD);
+            PropertyValue usernameProperty = 
context.getProperty(USERNAME).evaluateAttributeExpressions();
+            PropertyValue passwordProperty = 
context.getProperty(PASSWORD).evaluateAttributeExpressions();
 
             if (usernameProperty != null && passwordProperty != null) {
                 username = usernameProperty.getValue();
@@ -203,7 +207,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
 
             // Create the cluster and connect to it
             Cluster newCluster = createCluster(contactPoints, sslContext, 
username, password);
-            PropertyValue keyspaceProperty = context.getProperty(KEYSPACE);
+            PropertyValue keyspaceProperty = 
context.getProperty(KEYSPACE).evaluateAttributeExpressions();
             final Session newSession;
             if (keyspaceProperty != null) {
                 newSession = newCluster.connect(keyspaceProperty.getValue());

http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
index 2f680a6..81fd6c8 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
@@ -95,6 +95,7 @@ public class PutCassandraQL extends 
AbstractCassandraProcessor {
                     + "Time Unit, such as: nanos, millis, secs, mins, hrs, 
days. A value of zero means there is no limit. ")
             .defaultValue("0 seconds")
             .required(true)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
@@ -177,8 +178,8 @@ public class PutCassandraQL extends 
AbstractCassandraProcessor {
         }
 
         final long startNanos = System.nanoTime();
-        final long statementTimeout = 
context.getProperty(STATEMENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
-        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+        final long statementTimeout = 
context.getProperty(STATEMENT_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
         // The documentation for the driver recommends the session remain open 
the entire time the processor is running
         // and states that it is thread-safe. This is why connectionSession is 
not in a try-with-resources.

http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index ca7f690..7387334 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -103,6 +103,7 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
                     + "Time Unit, such as: nanos, millis, secs, mins, hrs, 
days. A value of zero means there is no limit. ")
             .defaultValue("0 seconds")
             .required(true)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
@@ -112,6 +113,7 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
                     + "and means there is no limit.")
             .defaultValue("0")
             .required(true)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.INTEGER_VALIDATOR)
             .build();
 
@@ -178,7 +180,7 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
         ComponentLog log = getLogger();
         try {
             connectToCassandra(context);
-            final int fetchSize = context.getProperty(FETCH_SIZE).asInteger();
+            final int fetchSize = 
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
             if (fetchSize > 0) {
                 synchronized (cluster.get()) {
                     
cluster.get().getConfiguration().getQueryOptions().setFetchSize(fetchSize);
@@ -214,9 +216,9 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
 
         final ComponentLog logger = getLogger();
         final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-        final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
         final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
         final StopWatch stopWatch = new StopWatch(true);
 
         if (fileToProcess == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
index 3b0e273..aeb8f59 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
@@ -88,6 +88,16 @@ public class AbstractCassandraProcessorTest {
         testRunner.assertValid();
     }
 
+    @Test
+    public void testCustomValidateEL() throws Exception {
+        testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"${host}");
+        testRunner.setProperty(AbstractCassandraProcessor.KEYSPACE, 
"${keyspace}");
+        testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
+        testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, 
"${password}");
+        testRunner.setProperty(AbstractCassandraProcessor.CHARSET, 
"${charset}");
+        testRunner.assertValid();
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testGetCassandraObject() throws Exception {
@@ -247,7 +257,6 @@ public class AbstractCassandraProcessorTest {
         assertNotNull(processor.getCluster());
     }
 
-
     /**
      * Provides a stubbed processor instance for testing
      */
@@ -255,7 +264,7 @@ public class AbstractCassandraProcessorTest {
 
         @Override
         protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-            return Arrays.asList(CONTACT_POINTS, USERNAME, PASSWORD, 
CONSISTENCY_LEVEL);
+            return Arrays.asList(CONTACT_POINTS, KEYSPACE, USERNAME, PASSWORD, 
CONSISTENCY_LEVEL, CHARSET);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
index b3e4fe2..de66235 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
@@ -77,6 +77,17 @@ public class PutCassandraQLTest {
     }
 
     @Test
+    public void testProcessorELConfigValidity() {
+        testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"${hosts}");
+        testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}");
+        testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
+        testRunner.setProperty(AbstractCassandraProcessor.CHARSET, 
"${charset}");
+        testRunner.setProperty(PutCassandraQL.STATEMENT_TIMEOUT, "${timeout}");
+
+        testRunner.assertValid();
+    }
+
+    @Test
     public void testProcessorHappyPath() {
         setUpStandardTestConfig();
 
@@ -113,6 +124,54 @@ public class PutCassandraQLTest {
     }
 
     @Test
+    public void testProcessorHappyPathELConfig() {
+        testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"${hosts}");
+        testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}");
+        testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
+        testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, 
"ONE");
+        testRunner.setProperty(AbstractCassandraProcessor.CHARSET, 
"${charset}");
+        testRunner.setProperty(PutCassandraQL.STATEMENT_TIMEOUT, "${timeout}");
+        testRunner.assertValid();
+
+        testRunner.setVariable("hosts", "localhost:9042");
+        testRunner.setVariable("user", "username");
+        testRunner.setVariable("pass", "password");
+        testRunner.setVariable("charset", "UTF-8");
+        testRunner.setVariable("timeout", "30 sec");
+
+        testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, 
properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?",
+                new HashMap<String, String>() {
+                    {
+                        put("cql.args.1.type", "int");
+                        put("cql.args.1.value", "1");
+                        put("cql.args.2.type", "text");
+                        put("cql.args.2.value", "Joe");
+                        put("cql.args.3.type", "text");
+                        // No value for arg 3 to test setNull
+                        put("cql.args.4.type", "map<text,text>");
+                        put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
+                        put("cql.args.5.type", "list<boolean>");
+                        put("cql.args.5.value", "[true,false,true]");
+                        put("cql.args.6.type", "set<double>");
+                        put("cql.args.6.value", "{1.0, 2.0}");
+                        put("cql.args.7.type", "bigint");
+                        put("cql.args.7.value", "20000000");
+                        put("cql.args.8.type", "float");
+                        put("cql.args.8.value", "1.0");
+                        put("cql.args.9.type", "blob");
+                        put("cql.args.9.value", "0xDEADBEEF");
+                        put("cql.args.10.type", "timestamp");
+                        put("cql.args.10.value", "2016-07-01T15:21:05Z");
+
+                    }
+                });
+
+        testRunner.run(1, true, true);
+        testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 
1);
+        testRunner.clearTransferState();
+    }
+
+    @Test
     public void testProcessorBadTimestamp() {
         setUpStandardTestConfig();
         processor.setExceptionToThrow(

http://git-wip-us.apache.org/repos/asf/nifi/blob/afd2b04a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index 023f239..5cd54e9 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -79,6 +79,16 @@ public class QueryCassandraTest {
     }
 
     @Test
+    public void testProcessorELConfigValid() {
+        testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, 
"ONE");
+        testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"${hosts}");
+        testRunner.setProperty(QueryCassandra.CQL_SELECT_QUERY, "${query}");
+        testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}");
+        testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
+        testRunner.assertValid();
+    }
+
+    @Test
     public void testProcessorNoInputFlowFileAndExceptions() {
         setUpStandardProcessorConfig();
 
@@ -140,6 +150,43 @@ public class QueryCassandraTest {
     }
 
     @Test
+    public void testProcessorELConfigJsonOutput() {
+        testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"${hosts}");
+        testRunner.setProperty(QueryCassandra.CQL_SELECT_QUERY, "${query}");
+        testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}");
+        testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "${user}");
+        testRunner.setProperty(AbstractCassandraProcessor.CHARSET, 
"${charset}");
+        testRunner.setProperty(QueryCassandra.QUERY_TIMEOUT, "${timeout}");
+        testRunner.setProperty(QueryCassandra.FETCH_SIZE, "${fetch}");
+        testRunner.setIncomingConnection(false);
+        testRunner.assertValid();
+
+        testRunner.setVariable("hosts", "localhost:9042");
+        testRunner.setVariable("user", "username");
+        testRunner.setVariable("pass", "password");
+        testRunner.setVariable("charset", "UTF-8");
+        testRunner.setVariable("timeout", "30 sec");
+        testRunner.setVariable("fetch", "0");
+
+        // Test JSON output
+        testRunner.setProperty(QueryCassandra.OUTPUT_FORMAT, 
QueryCassandra.JSON_FORMAT);
+        testRunner.run(1, true, true);
+        testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_SUCCESS, 
1);
+        List<MockFlowFile> files = 
testRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS);
+        assertNotNull(files);
+        assertEquals("One file should be transferred to success", 1, 
files.size());
+        
assertEquals("{\"results\":[{\"user_id\":\"user1\",\"first_name\":\"Joe\",\"last_name\":\"Smith\","
+                        + 
"\"emails\":[\"[email protected]\"],\"top_places\":[\"New York, NY\",\"Santa 
Clara, CA\"],"
+                        + "\"todo\":{\"2016-01-03 05:00:00+0000\":\"Set my 
alarm for a month from now\"},"
+                        + 
"\"registered\":\"false\",\"scale\":1.0,\"metric\":2.0},"
+                        + 
"{\"user_id\":\"user2\",\"first_name\":\"Mary\",\"last_name\":\"Jones\","
+                        + 
"\"emails\":[\"[email protected]\"],\"top_places\":[\"Orlando, FL\"],"
+                        + "\"todo\":{\"2016-02-03 05:00:00+0000\":\"Get milk 
and bread\"},"
+                        + 
"\"registered\":\"true\",\"scale\":3.0,\"metric\":4.0}]}",
+                new String(files.get(0).toByteArray()));
+    }
+
+    @Test
     public void testProcessorJsonOutputWithQueryTimeout() {
         setUpStandardProcessorConfig();
         testRunner.setProperty(QueryCassandra.QUERY_TIMEOUT, "5 sec");

Reply via email to