NIFI-901: Add QueryCassandra and PutCassandraQL processors

This closes #237

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f610793
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f610793
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f610793

Branch: refs/heads/master
Commit: 0f61079300fdf63dd24288c0d2e70d7707dab08f
Parents: e83429a
Author: Matt Burgess <[email protected]>
Authored: Wed Mar 16 13:19:18 2016 -0400
Committer: jpercivall <[email protected]>
Committed: Wed Mar 16 14:09:15 2016 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   5 +
 .../nifi-cassandra-nar/pom.xml                  |  44 ++
 .../src/main/resources/META-INF/LICENSE         | 209 +++++++
 .../src/main/resources/META-INF/NOTICE          |  17 +
 .../nifi-cassandra-processors/pom.xml           |  65 +++
 .../cassandra/AbstractCassandraProcessor.java   | 458 +++++++++++++++
 .../processors/cassandra/PutCassandraQL.java    | 396 +++++++++++++
 .../processors/cassandra/QueryCassandra.java    | 555 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |  16 +
 .../AbstractCassandraProcessorTest.java         | 286 ++++++++++
 .../cassandra/CassandraQueryTestUtil.java       | 119 ++++
 .../cassandra/PutCassandraQLTest.java           | 219 ++++++++
 .../cassandra/QueryCassandraTest.java           | 370 +++++++++++++
 nifi-nar-bundles/nifi-cassandra-bundle/pom.xml  |  43 ++
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |   6 +
 16 files changed, 2809 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0f610793/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 38cda63..3ad558b 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -292,6 +292,11 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-jms-processors-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cassandra-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 
     <properties>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f610793/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/pom.xml 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/pom.xml
new file mode 100644
index 0000000..6649b94
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cassandra-bundle</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-cassandra-nar</artifactId>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cassandra-processors</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f610793/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/src/main/resources/META-INF/LICENSE
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..6effaa8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f610793/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..a62a4e9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,17 @@
+nifi-cassandra-nar
+Copyright 2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Avro
+      The following NOTICE information applies:
+        Apache Avro
+        Copyright 2009-2013 The Apache Software Foundation
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f610793/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
new file mode 100644
index 0000000..242460e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cassandra-bundle</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-cassandra-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+            <version>2.1.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f610793/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
new file mode 100644
index 0000000..672a3ee
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * AbstractCassandraProcessor is a base class for Cassandra processors and 
contains logic and variables common to most
+ * processors integrating with Apache Cassandra.
+ */
+public abstract class AbstractCassandraProcessor extends AbstractProcessor {
+
+    public static final int DEFAULT_CASSANDRA_PORT = 9042;
+
+    private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            final List<String> esList = Arrays.asList(input.split(","));
+            for (String hostnamePort : esList) {
+                String[] addresses = hostnamePort.split(":");
+                // Protect against invalid input like http://127.0.0.1:9042 
(URL scheme should not be there)
+                if (addresses.length != 2) {
+                    return new 
ValidationResult.Builder().subject(subject).input(input).explanation(
+                            "Each entry must be in hostname:port form (no 
scheme such as http://, and port must be specified)")
+                            .valid(false).build();
+                }
+                // Validate the port
+                String port = addresses[1].trim();
+                ValidationResult portValidatorResult = 
StandardValidators.PORT_VALIDATOR.validate(subject, port, context);
+                if (!portValidatorResult.isValid()) {
+                    return portValidatorResult;
+                }
+
+            }
+            return new 
ValidationResult.Builder().subject(subject).input(input).explanation(
+                    "Valid cluster definition").valid(true).build();
+        }
+    };
+
+    // Common descriptors
+    public static final PropertyDescriptor CONTACT_POINTS = new 
PropertyDescriptor.Builder()
+            .name("Cassandra Contact Points")
+            .description("Contact points are addresses of Cassandra nodes. The 
list of contact points should be "
+                    + "comma-separated and in hostname:port format. Example 
node1:port,node2:port,...."
+                    + " The default client port for Cassandra is 9042, but the 
port(s) must be explicitly specified.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(HOSTNAME_PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor KEYSPACE = new 
PropertyDescriptor.Builder()
+            .name("Keyspace")
+            .description("The Cassandra Keyspace to connect to. If no keyspace 
is specified, the query will need to "
+                    + "include the keyspace name before any table reference.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL "
+                    + "connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
+            .name("Client Auth")
+            .description("Client authentication policy when connecting to 
secure (TLS/SSL) cluster. "
+                    + "Possible values are REQUIRED, WANT, NONE. This property 
is only used when an SSL Context "
+                    + "has been defined and enabled.")
+            .required(false)
+            .allowableValues(SSLContextService.ClientAuth.values())
+            .defaultValue("REQUIRED")
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+            .name("Username")
+            .description("Username to access the Cassandra cluster")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("Password")
+            .description("Password to access the Cassandra cluster")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CONSISTENCY_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("Consistency Level")
+            .description("The strategy for how many replicas must respond 
before results are returned.")
+            .required(true)
+            .allowableValues(ConsistencyLevel.values())
+            .defaultValue("ONE")
+            .build();
+
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the record data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    static {
+        descriptors.add(CONTACT_POINTS);
+        descriptors.add(KEYSPACE);
+        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONSISTENCY_LEVEL);
+        descriptors.add(CHARSET);
+    }
+
+    protected final AtomicReference<Cluster> cluster = new 
AtomicReference<>(null);
+    protected final AtomicReference<Session> cassandraSession = new 
AtomicReference<>(null);
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Set<ValidationResult> results = new HashSet<>();
+
+        // Ensure that if username or password is set, then the other is too
+        Map<PropertyDescriptor, String> propertyMap = 
validationContext.getProperties();
+        if (StringUtils.isEmpty(propertyMap.get(USERNAME)) != 
StringUtils.isEmpty(propertyMap.get(PASSWORD))) {
+            results.add(new 
ValidationResult.Builder().valid(false).explanation(
+                    "If username or password is specified, then the other must 
be specified as well").build());
+        }
+
+        return results;
+    }
+
+    protected void connectToCassandra(ProcessContext context) {
+        if (cluster.get() == null) {
+            ProcessorLog log = getLogger();
+            final String contactPointList = 
context.getProperty(CONTACT_POINTS).getValue();
+            final String consistencyLevel = 
context.getProperty(CONSISTENCY_LEVEL).getValue();
+            List<InetSocketAddress> contactPoints = 
getContactPoints(contactPointList);
+
+            // Set up the client for secure (SSL/TLS communications) if 
configured to do so
+            final SSLContextService sslService =
+                    
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+            final String rawClientAuth = 
context.getProperty(CLIENT_AUTH).getValue();
+            final SSLContext sslContext;
+
+            if (sslService != null) {
+                final SSLContextService.ClientAuth clientAuth;
+                if (StringUtils.isBlank(rawClientAuth)) {
+                    clientAuth = SSLContextService.ClientAuth.REQUIRED;
+                } else {
+                    try {
+                        clientAuth = 
SSLContextService.ClientAuth.valueOf(rawClientAuth);
+                    } catch (final IllegalArgumentException iae) {
+                        throw new 
ProviderCreationException(String.format("Unrecognized client auth '%s'. 
Possible values are [%s]",
+                                rawClientAuth, 
StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+                    }
+                }
+                sslContext = sslService.createSSLContext(clientAuth);
+            } else {
+                sslContext = null;
+            }
+
+            final String username, password;
+            PropertyValue usernameProperty = context.getProperty(USERNAME);
+            PropertyValue passwordProperty = context.getProperty(PASSWORD);
+
+            if (usernameProperty != null && passwordProperty != null) {
+                username = usernameProperty.getValue();
+                password = passwordProperty.getValue();
+            } else {
+                username = null;
+                password = null;
+            }
+
+            // Create the cluster and connect to it
+            Cluster newCluster = createCluster(contactPoints, sslContext, 
username, password);
+            PropertyValue keyspaceProperty = context.getProperty(KEYSPACE);
+            final Session newSession;
+            if (keyspaceProperty != null) {
+                newSession = newCluster.connect(keyspaceProperty.getValue());
+            } else {
+                newSession = newCluster.connect();
+            }
+            
newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel));
+            Metadata metadata = newCluster.getMetadata();
+            log.info("Connected to Cassandra cluster: {}", new 
Object[]{metadata.getClusterName()});
+            cluster.set(newCluster);
+            cassandraSession.set(newSession);
+        }
+    }
+
+    /**
+     * Uses a Cluster.Builder to create a Cassandra cluster reference using 
the given parameters
+     *
+     * @param contactPoints The contact points (hostname:port list of 
Cassandra nodes)
+     * @param sslContext    The SSL context (used for secure connections)
+     * @param username      The username for connection authentication
+     * @param password      The password for connection authentication
+     * @return A reference to the Cluster object associated with the given 
Cassandra configuration
+     */
+    protected Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
+                                    String username, String password) {
+        Cluster.Builder builder = 
Cluster.builder().addContactPointsWithPorts(contactPoints);
+        if (sslContext != null) {
+            builder = builder.withSSL(new SSLOptions(sslContext, 
SSLOptions.DEFAULT_SSL_CIPHER_SUITES));
+        }
+        if (username != null && password != null) {
+            builder = builder.withCredentials(username, password);
+        }
+        return builder.build();
+    }
+
+    public void stop() {
+        if (cassandraSession.get() != null) {
+            cassandraSession.get().close();
+            cassandraSession.set(null);
+        }
+        if (cluster.get() != null) {
+            cluster.get().close();
+            cluster.set(null);
+        }
+    }
+
+
+    protected static Object getCassandraObject(Row row, int i, DataType 
dataType) {
+        if (dataType.equals(DataType.blob())) {
+            return row.getBytes(i);
+
+        } else if (dataType.equals(DataType.varint()) || 
dataType.equals(DataType.decimal())) {
+            // Avro can't handle BigDecimal and BigInteger as numbers - it 
will throw an
+            // AvroRuntimeException such as: "Unknown datum type: 
java.math.BigDecimal: 38"
+            return row.getObject(i).toString();
+
+        } else if (dataType.equals(DataType.cboolean())) {
+            return row.getBool(i);
+
+        } else if (dataType.equals(DataType.cint())) {
+            return row.getInt(i);
+
+        } else if (dataType.equals(DataType.bigint())
+                || dataType.equals(DataType.counter())) {
+            return row.getLong(i);
+
+        } else if (dataType.equals(DataType.ascii())
+                || dataType.equals(DataType.text())
+                || dataType.equals(DataType.varchar())) {
+            return row.getString(i);
+
+        } else if (dataType.equals(DataType.cfloat())) {
+            return row.getFloat(i);
+
+        } else if (dataType.equals(DataType.cdouble())) {
+            return row.getDouble(i);
+
+        } else if (dataType.equals(DataType.timestamp())) {
+            return row.getDate(i);
+
+        } else if (dataType.isCollection()) {
+
+            List<DataType> typeArguments = dataType.getTypeArguments();
+            if (typeArguments == null || typeArguments.size() == 0) {
+                throw new IllegalArgumentException("Column[" + i + "] " + 
dataType.getName()
+                        + " is a collection but no type arguments were 
specified!");
+            }
+            // Get the first type argument, to be used for lists and sets (and 
the first in a map)
+            DataType firstArg = typeArguments.get(0);
+            if (dataType.equals(DataType.set(firstArg))) {
+                return row.getSet(i, firstArg.asJavaClass());
+            } else if (dataType.equals(DataType.list(firstArg))) {
+                return row.getList(i, firstArg.asJavaClass());
+            } else {
+                // Must be an n-arg collection like map
+                DataType secondArg = typeArguments.get(1);
+                if (dataType.equals(DataType.map(firstArg, secondArg))) {
+                    return row.getMap(i, firstArg.asJavaClass(), 
secondArg.asJavaClass());
+                }
+            }
+
+        } else {
+            // The different types that we support are numbers (int, long, 
double, float),
+            // as well as boolean values and Strings. Since Avro doesn't 
provide
+            // timestamp types, we want to convert those to Strings. So we 
will cast anything other
+            // than numbers or booleans to strings by using the toString() 
method.
+            return row.getObject(i).toString();
+        }
+        return null;
+    }
+
+    /**
+     * This method will create a schema a union field consisting of null and 
the specified type.
+     *
+     * @param dataType The data type of the field
+     */
+    public static Schema getUnionFieldType(String dataType) {
+        return 
SchemaBuilder.builder().unionOf().nullBuilder().endNull().and().type(getSchemaForType(dataType)).endUnion();
+    }
+
+    /**
+     * This method will create an Avro schema for the specified type.
+     *
+     * @param dataType The data type of the field
+     */
+    public static Schema getSchemaForType(String dataType) {
+        SchemaBuilder.TypeBuilder<Schema> typeBuilder = 
SchemaBuilder.builder();
+        Schema returnSchema;
+        switch (dataType) {
+            case "string":
+                returnSchema = typeBuilder.stringType();
+                break;
+            case "boolean":
+                returnSchema = typeBuilder.booleanType();
+                break;
+            case "int":
+                returnSchema = typeBuilder.intType();
+                break;
+            case "long":
+                returnSchema = typeBuilder.longType();
+                break;
+            case "float":
+                returnSchema = typeBuilder.floatType();
+                break;
+            case "double":
+                returnSchema = typeBuilder.doubleType();
+                break;
+            case "bytes":
+                returnSchema = typeBuilder.bytesType();
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown Avro primitive 
type: " + dataType);
+        }
+        return returnSchema;
+    }
+
+    public static String getPrimitiveAvroTypeFromCassandraType(DataType 
dataType) {
+        // Map types from Cassandra to Avro where possible
+        if (dataType.equals(DataType.ascii())
+                || dataType.equals(DataType.text())
+                || dataType.equals(DataType.varchar())
+                // Nonstandard types represented by this processor as a string
+                || dataType.equals(DataType.timestamp())
+                || dataType.equals(DataType.timeuuid())
+                || dataType.equals(DataType.uuid())
+                || dataType.equals(DataType.inet())
+                || dataType.equals(DataType.varint())) {
+            return "string";
+
+        } else if (dataType.equals(DataType.cboolean())) {
+            return "boolean";
+
+        } else if (dataType.equals(DataType.cint())) {
+            return "int";
+
+        } else if (dataType.equals(DataType.bigint())
+                || dataType.equals(DataType.counter())) {
+            return "long";
+
+        } else if (dataType.equals(DataType.cfloat())) {
+            return "float";
+
+        } else if (dataType.equals(DataType.cdouble())) {
+            return "double";
+
+        } else if (dataType.equals(DataType.blob())) {
+            return "bytes";
+
+        } else {
+            throw new IllegalArgumentException("createSchema: Unknown 
Cassandra data type " + dataType.getName()
+                    + " cannot be converted to Avro type");
+        }
+    }
+
+    public static DataType getPrimitiveDataTypeFromString(String dataTypeName) 
{
+        Set<DataType> primitiveTypes = DataType.allPrimitiveTypes();
+        for (DataType primitiveType : primitiveTypes) {
+            if (primitiveType.toString().equals(dataTypeName)) {
+                return primitiveType;
+            }
+        }
+        throw new IllegalArgumentException("Not a primitive Cassandra type: " 
+ dataTypeName);
+    }
+
+    /**
+     * Gets a list of InetSocketAddress objects that correspond to host:port 
entries for Cassandra contact points
+     *
+     * @param contactPointList A comma-separated list of Cassandra contact 
points (host:port,host2:port2, etc.)
+     * @return List of InetSocketAddresses for the Cassandra contact points
+     */
+    public List<InetSocketAddress> getContactPoints(String contactPointList) {
+
+        if (contactPointList == null) {
+            return null;
+        }
+        final List<String> contactPointStringList = 
Arrays.asList(contactPointList.split(","));
+        List<InetSocketAddress> contactPoints = new ArrayList<>();
+
+        for (String contactPointEntry : contactPointStringList) {
+
+            String[] addresses = contactPointEntry.split(":");
+            final String hostName = addresses[0].trim();
+            final int port = (addresses.length > 1) ? 
Integer.parseInt(addresses[1].trim()) : DEFAULT_CASSANDRA_PORT;
+
+            contactPoints.add(new InetSocketAddress(hostName, port));
+        }
+        return contactPoints;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f610793/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
new file mode 100644
index 0000000..c923b5b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.QueryExecutionException;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@SupportsBatching
+@Tags({"cassandra", "cql", "put", "insert", "update", "set"})
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Execute provided Cassandra Query Language (CQL) 
statement on a Cassandra 1.x or 2.x cluster. "
+        + "The content of an incoming FlowFile is expected to be the CQL 
command to execute. The CQL command may use "
+        + "the ? to escape parameters. In this case, the parameters to use 
must exist as FlowFile attributes with the "
+        + "naming convention cql.args.N.type and cql.args.N.value, where N is 
a positive integer. The cql.args.N.type "
+        + "is expected to be a lowercase string indicating the Cassandra 
type.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "cql.args.N.type",
+                description = "Incoming FlowFiles are expected to be 
parameterized CQL statements. The type of each "
+                        + "parameter is specified as a lowercase string 
corresponding to the Cassandra data type (text, "
+                        + "int, boolean, e.g.). In the case of collections, 
the primitive type(s) of the elements in the "
+                        + "collection should be comma-delimited, follow the 
collection type, and be enclosed in angle brackets "
+                        + "(< and >), for example set<text> or map<timestamp, 
int>."),
+        @ReadsAttribute(attribute = "cql.args.N.value",
+                description = "Incoming FlowFiles are expected to be 
parameterized CQL statements. The value of the "
+                        + "parameters are specified as cql.args.1.value, 
cql.args.2.value, cql.args.3.value, and so on. The "
+                        + " type of the cql.args.1.value parameter is 
specified by the cql.args.1.type attribute.")
+})
+public class PutCassandraQL extends AbstractCassandraProcessor {
+
+    public static final PropertyDescriptor STATEMENT_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Max Wait Time")
+            .description("The maximum amount of time allowed for a running CQL 
select query. Must be of format "
+                    + "<duration> <TimeUnit> where <duration> is a 
non-negative integer and TimeUnit is a supported "
+                    + "Time Unit, such as: nanos, millis, secs, mins, hrs, 
days. A value of zero means there is no limit. ")
+            .defaultValue("0 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successfully executed CQL statement.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("CQL statement execution failed.")
+            .build();
+    public static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
+            .description("A FlowFile is transferred to this relationship if 
the statement cannot be executed successfully but "
+                    + "attempting the operation again may succeed.")
+            .build();
+
+    private final static Set<Relationship> relationships;
+
+    private static final Pattern CQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("cql\\.args\\.(\\d+)\\.type");
+
+    // Matches on top-level type (primitive types like text,int) and also for 
collections (like list<boolean> and map<float,double>)
+    private static final Pattern CQL_TYPE_PATTERN = 
Pattern.compile("([^<]+)(<([^,>]+)(,([^,>]+))*>)?");
+
+
+    /*
+     * Will ensure that the list of property descriptors is build only once.
+     * Will also create a Set of relationships
+     */
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(STATEMENT_TIMEOUT);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        ProcessorLog log = getLogger();
+        try {
+            connectToCassandra(context);
+        } catch (final NoHostAvailableException nhae) {
+            log.error("No host in the Cassandra cluster can be contacted 
successfully to execute this statement", nhae);
+            // Log up to 10 error messages. Otherwise if a 1000-node cluster 
was specified but there was no connectivity,
+            // a thousand error messages would be logged. However we would 
like information from Cassandra itself, so
+            // cap the error limit at 10, format the messages, and don't 
include the stack trace (it is displayed by the
+            // logger message above).
+            log.error(nhae.getCustomMessage(10, true, false));
+            throw new ProcessException(nhae);
+        } catch (final AuthenticationException ae) {
+            log.error("Invalid username/password combination", ae);
+            throw new ProcessException(ae);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        ProcessorLog logger = getLogger();
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final long statementTimeout = 
context.getProperty(STATEMENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+
+        // The documentation for the driver recommends the session remain open 
the entire time the processor is running
+        // and states that it is thread-safe. This is why connectionSession is 
not in a try-with-resources.
+        final Session connectionSession = cassandraSession.get();
+
+        String cql = getCQL(session, flowFile, charset);
+        try {
+            PreparedStatement statement = connectionSession.prepare(cql);
+            BoundStatement boundStatement = statement.bind();
+
+            Map<String, String> attributes = flowFile.getAttributes();
+            for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
+                final String key = entry.getKey();
+                final Matcher matcher = 
CQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
+                if (matcher.matches()) {
+                    final int parameterIndex = 
Integer.parseInt(matcher.group(1));
+                    String paramType = entry.getValue();
+                    if (StringUtils.isEmpty(paramType)) {
+                        throw new ProcessException("Value of the " + key + " 
attribute is null or empty, it must contain a valid value");
+                    }
+
+                    paramType = paramType.trim();
+                    final String valueAttrName = "cql.args." + parameterIndex 
+ ".value";
+                    final String parameterValue = 
attributes.get(valueAttrName);
+
+                    try {
+                        setStatementObject(boundStatement, parameterIndex - 1, 
valueAttrName, parameterValue, paramType);
+                    } catch (final InvalidTypeException | 
IllegalArgumentException e) {
+                        throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue
+                                + "', which cannot be converted into the 
necessary data type: " + paramType, e);
+                    }
+                }
+            }
+
+            try {
+                ResultSetFuture future = 
connectionSession.executeAsync(boundStatement);
+                if (statementTimeout > 0) {
+                    future.getUninterruptibly(statementTimeout, 
TimeUnit.MILLISECONDS);
+                } else {
+                    future.getUninterruptibly();
+                }
+                // Emit a Provenance SEND event
+                final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+
+                // This isn't a real URI but since Cassandra is distributed we 
just use the cluster name
+                String transitUri = "cassandra://" + 
connectionSession.getCluster().getMetadata().getClusterName();
+                session.getProvenanceReporter().send(flowFile, transitUri, 
transmissionMillis, true);
+                session.transfer(flowFile, REL_SUCCESS);
+
+            } catch (final TimeoutException e) {
+                throw new ProcessException(e);
+            }
+
+
+        } catch (final NoHostAvailableException nhae) {
+            getLogger().error("No host in the Cassandra cluster can be 
contacted successfully to execute this statement", nhae);
+            // Log up to 10 error messages. Otherwise if a 1000-node cluster 
was specified but there was no connectivity,
+            // a thousand error messages would be logged. However we would 
like information from Cassandra itself, so
+            // cap the error limit at 10, format the messages, and don't 
include the stack trace (it is displayed by the
+            // logger message above).
+            getLogger().error(nhae.getCustomMessage(10, true, false));
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_RETRY);
+
+        } catch (final QueryExecutionException qee) {
+            logger.error("Cannot execute the statement with the requested 
consistency level successfully", qee);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_RETRY);
+
+        } catch (final QueryValidationException qve) {
+            logger.error("The CQL statement {} is invalid due to syntax error, 
authorization issue, or another "
+                            + "validation problem; routing {} to failure",
+                    new Object[]{cql, flowFile}, qve);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+
+        } catch (final ProcessException e) {
+            logger.error("Unable to execute CQL select statement {} for {} due 
to {}; routing to failure",
+                    new Object[]{cql, flowFile, e});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    /**
+     * Determines the CQL statement that should be executed for the given 
FlowFile
+     *
+     * @param session  the session that can be used to access the given 
FlowFile
+     * @param flowFile the FlowFile whose CQL statement should be executed
+     * @return the CQL that is associated with the given FlowFile
+     */
+
+    private String getCQL(final ProcessSession session, final FlowFile 
flowFile, final Charset charset) {
+        // Read the CQL from the FlowFile's content
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer);
+            }
+        });
+
+        // Create the PreparedStatement string to use for this FlowFile.
+        return new String(buffer, charset);
+    }
+
+    /**
+     * Determines how to map the given value to the appropriate Cassandra data 
type and returns the object as
+     * represented by the given type. This can be used in a 
Prepared/BoundStatement.
+     *
+     * @param statement  the BoundStatement for setting objects on
+     * @param paramIndex the index of the parameter at which to set the object
+     * @param attrName   the name of the attribute that the parameter is 
coming from - for logging purposes
+     * @param paramValue the value of the CQL parameter to set
+     * @param paramType  the Cassandra data type of the CQL parameter to set
+     * @throws IllegalArgumentException if the PreparedStatement throws a 
CQLException when calling the appropriate setter
+     */
+    protected void setStatementObject(final BoundStatement statement, final 
int paramIndex, final String attrName,
+                                      final String paramValue, final String 
paramType) throws IllegalArgumentException {
+        if (paramValue == null) {
+            statement.setToNull(paramIndex);
+            return;
+        } else if (paramType == null) {
+            throw new IllegalArgumentException("Parameter type for " + 
attrName + " cannot be null");
+
+        } else {
+            // Parse the top-level type and any parameterized types (for 
collections)
+            final Matcher matcher = CQL_TYPE_PATTERN.matcher(paramType);
+
+            // If the matcher doesn't match, this should fall through to the 
exception at the bottom
+            if (matcher.find() && matcher.groupCount() > 1) {
+                String mainTypeString = matcher.group(1).toLowerCase();
+                DataType.Name mainTypeName = 
DataType.Name.valueOf(mainTypeString.toUpperCase());
+                if (!mainTypeName.isCollection()) {
+                    DataType mainType = 
getPrimitiveDataTypeFromString(mainTypeString);
+
+                    // Need the right statement.setXYZ() method
+                    if (mainType.equals(DataType.ascii())
+                            || mainType.equals(DataType.text())
+                            || mainType.equals(DataType.varchar())
+                            || mainType.equals(DataType.timestamp())
+                            || mainType.equals(DataType.timeuuid())
+                            || mainType.equals(DataType.uuid())
+                            || mainType.equals(DataType.inet())
+                            || mainType.equals(DataType.varint())) {
+                        // These are strings, so just use the paramValue
+                        statement.setString(paramIndex, paramValue);
+
+                    } else if (mainType.equals(DataType.cboolean())) {
+                        statement.setBool(paramIndex, (boolean) 
mainType.parse(paramValue));
+
+                    } else if (mainType.equals(DataType.cint())) {
+                        statement.setInt(paramIndex, (int) 
mainType.parse(paramValue));
+
+                    } else if (mainType.equals(DataType.bigint())
+                            || mainType.equals(DataType.counter())) {
+                        statement.setLong(paramIndex, (long) 
mainType.parse(paramValue));
+
+                    } else if (mainType.equals(DataType.cfloat())) {
+                        statement.setFloat(paramIndex, (float) 
mainType.parse(paramValue));
+
+                    } else if (mainType.equals(DataType.cdouble())) {
+                        statement.setDouble(paramIndex, (double) 
mainType.parse(paramValue));
+
+                    } else if (mainType.equals(DataType.blob())) {
+                        statement.setBytes(paramIndex, (ByteBuffer) 
mainType.parse(paramValue));
+
+                    }
+                    return;
+                } else {
+                    // Get the first parameterized type
+                    if (matcher.groupCount() > 2) {
+                        String firstParamTypeName = matcher.group(3);
+                        DataType firstParamType = 
getPrimitiveDataTypeFromString(firstParamTypeName);
+
+                        // Check for map type
+                        if 
(DataType.Name.MAP.toString().equalsIgnoreCase(mainTypeString)) {
+                            if (matcher.groupCount() > 4) {
+                                String secondParamTypeName = matcher.group(5);
+                                DataType secondParamType = 
getPrimitiveDataTypeFromString(secondParamTypeName);
+                                statement.setMap(paramIndex, (Map) 
DataType.map(firstParamType, secondParamType).parse(paramValue));
+                                return;
+                            }
+                        } else {
+                            // Must be set or list
+                            if 
(DataType.Name.SET.toString().equalsIgnoreCase(mainTypeString)) {
+                                statement.setSet(paramIndex, (Set) 
DataType.set(firstParamType).parse(paramValue));
+                                return;
+                            } else if 
(DataType.Name.LIST.toString().equalsIgnoreCase(mainTypeString)) {
+                                statement.setList(paramIndex, (List) 
DataType.list(firstParamType).parse(paramValue));
+                                return;
+                            }
+                        }
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Collection type " + mainTypeString + " needs 
parameterized type(s), such as set<text>");
+                    }
+
+                }
+            }
+
+        }
+        throw new IllegalArgumentException("Cannot create object of type " + 
paramType + " using input " + paramValue);
+    }
+
+    @OnUnscheduled
+    public void stop() {
+        super.stop();
+    }
+
+    @OnShutdown
+    public void shutdown() {
+        super.stop();
+    }
+
+}
\ No newline at end of file

Reply via email to