This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e10e41  NIFI-4358 This closes #3363. cassandra connection enable 
compression at resquest and response
0e10e41 is described below

commit 0e10e417df4d63f3815874fbbf11fc3a450e67af
Author: Sandish Kumar <[email protected]>
AuthorDate: Mon Mar 11 00:10:09 2019 -0500

    NIFI-4358 This closes #3363. cassandra connection enable compression at 
resquest and response
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../cassandra/AbstractCassandraProcessor.java      |  22 +-
 .../cassandra/AbstractCassandraProcessorTest.java  |   2 +-
 .../processors/cassandra/PutCassandraQLTest.java   |   2 +-
 .../cassandra/PutCassandraRecordTest.java          | 396 +++++++-------
 .../processors/cassandra/QueryCassandraTest.java   |   2 +-
 .../nifi/service/CassandraSessionProvider.java     | 589 +++++++++++----------
 .../nifi/service/TestCassandraSessionProvider.java | 118 ++---
 7 files changed, 584 insertions(+), 547 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
index 2e2b8a8..2c96671 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
@@ -22,6 +22,7 @@ import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.JdkSSLOptions;
 import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ProtocolOptions;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TypeCodec;
@@ -137,6 +138,15 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
             .defaultValue("ONE")
             .build();
 
+    static final PropertyDescriptor COMPRESSION_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Compression Type")
+            .description("Enable compression at transport-level requests and 
responses")
+            .required(false)
+            .allowableValues(ProtocolOptions.Compression.values())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("NONE")
+            .build();
+
     static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
             .name("Character Set")
             .description("Specifies the character set of the record data.")
@@ -172,6 +182,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
         descriptors.add(USERNAME);
         descriptors.add(PASSWORD);
         descriptors.add(CONSISTENCY_LEVEL);
+        descriptors.add(COMPRESSION_TYPE);
         descriptors.add(CHARSET);
     }
 
@@ -238,6 +249,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
             ComponentLog log = getLogger();
             final String contactPointList = 
context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
             final String consistencyLevel = 
context.getProperty(CONSISTENCY_LEVEL).getValue();
+            final String compressionType = 
context.getProperty(COMPRESSION_TYPE).getValue();
             List<InetSocketAddress> contactPoints = 
getContactPoints(contactPointList);
 
             // Set up the client for secure (SSL/TLS communications) if 
configured to do so
@@ -277,7 +289,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
             }
 
             // Create the cluster and connect to it
-            Cluster newCluster = createCluster(contactPoints, sslContext, 
username, password);
+            Cluster newCluster = createCluster(contactPoints, sslContext, 
username, password, compressionType);
             PropertyValue keyspaceProperty = 
context.getProperty(KEYSPACE).evaluateAttributeExpressions();
 
             final Session newSession;
@@ -304,16 +316,22 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
      * @param sslContext    The SSL context (used for secure connections)
      * @param username      The username for connection authentication
      * @param password      The password for connection authentication
+     * @param compressionType Enable compression at transport-level requests 
and responses.
      * @return A reference to the Cluster object associated with the given 
Cassandra configuration
      */
     protected Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
-                                    String username, String password) {
+                                    String username, String password, String 
compressionType) {
         Cluster.Builder builder = 
Cluster.builder().addContactPointsWithPorts(contactPoints);
         if (sslContext != null) {
             JdkSSLOptions sslOptions = JdkSSLOptions.builder()
                     .withSSLContext(sslContext)
                     .build();
             builder = builder.withSSL(sslOptions);
+            if(ProtocolOptions.Compression.SNAPPY.equals(compressionType)) {
+                builder = 
builder.withCompression(ProtocolOptions.Compression.SNAPPY);
+            } else if(ProtocolOptions.Compression.LZ4.equals(compressionType)) 
{
+                builder = 
builder.withCompression(ProtocolOptions.Compression.LZ4);
+            }
         }
         if (username != null && password != null) {
             builder = builder.withCredentials(username, password);
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
index 4401cd9..c8303e4 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
@@ -303,7 +303,7 @@ public class AbstractCassandraProcessorTest {
 
         @Override
         protected Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
-                                        String username, String password) {
+                                        String username, String password, 
String compressionType) {
             Cluster mockCluster = mock(Cluster.class);
             Metadata mockMetadata = mock(Metadata.class);
             when(mockMetadata.getClusterName()).thenReturn("cluster1");
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
index 60c3cda..7235e81 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
@@ -314,7 +314,7 @@ public class PutCassandraQLTest {
 
         @Override
         protected Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
-                                        String username, String password) {
+                                        String username, String password, 
String compressionType) {
             Cluster mockCluster = mock(Cluster.class);
             try {
                 Metadata mockMetadata = mock(Metadata.class);
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
index 88a9b5d..32a1145 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
@@ -1,198 +1,198 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.cassandra;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Configuration;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.record.MockRecordParser;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-
-import javax.net.ssl.SSLContext;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class PutCassandraRecordTest {
-
-    private TestRunner testRunner;
-    private MockRecordParser recordReader;
-
-    @Before
-    public void setUp() throws Exception {
-        MockPutCassandraRecord processor = new MockPutCassandraRecord();
-        recordReader = new MockRecordParser();
-        testRunner = TestRunners.newTestRunner(processor);
-        testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, 
"reader");
-    }
-
-    @Test
-    public void testProcessorConfigValidity() throws InitializationException {
-        testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, 
"localhost:9042");
-        testRunner.assertNotValid();
-
-        testRunner.setProperty(PutCassandraRecord.PASSWORD, "password");
-        testRunner.assertNotValid();
-
-        testRunner.setProperty(PutCassandraRecord.USERNAME, "username");
-        testRunner.assertNotValid();
-
-        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
-        testRunner.assertNotValid();
-
-        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
"LOGGED");
-        testRunner.assertNotValid();
-
-        testRunner.setProperty(PutCassandraRecord.KEYSPACE, "sampleks");
-        testRunner.assertNotValid();
-
-        testRunner.setProperty(PutCassandraRecord.TABLE, "sampletbl");
-        testRunner.assertNotValid();
-
-        testRunner.addControllerService("reader", recordReader);
-        testRunner.enableControllerService(recordReader);
-        testRunner.assertValid();
-    }
-
-    private void setUpStandardTestConfig() throws InitializationException {
-        testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"localhost:9042");
-        testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, 
"password");
-        testRunner.setProperty(AbstractCassandraProcessor.USERNAME, 
"username");
-        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
-        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
"LOGGED");
-        testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
-        testRunner.addControllerService("reader", recordReader);
-        testRunner.enableControllerService(recordReader);
-    }
-
-    @Test
-    public void testSimplePut() throws InitializationException {
-        setUpStandardTestConfig();
-
-        recordReader.addSchemaField("name", RecordFieldType.STRING);
-        recordReader.addSchemaField("age", RecordFieldType.INT);
-        recordReader.addSchemaField("sport", RecordFieldType.STRING);
-
-        recordReader.addRecord("John Doe", 48, "Soccer");
-        recordReader.addRecord("Jane Doe", 47, "Tennis");
-        recordReader.addRecord("Sally Doe", 47, "Curling");
-        recordReader.addRecord("Jimmy Doe", 14, null);
-        recordReader.addRecord("Pizza Doe", 14, null);
-
-        testRunner.enqueue("");
-        testRunner.run();
-
-        
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
-    }
-
-    @Test
-    public void testEL() throws InitializationException {
-        testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, 
"${contact.points}");
-        testRunner.setProperty(PutCassandraRecord.PASSWORD, "${pass}");
-        testRunner.setProperty(PutCassandraRecord.USERNAME, "${user}");
-        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
-        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
"LOGGED");
-        testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
-        testRunner.addControllerService("reader", recordReader);
-        testRunner.enableControllerService(recordReader);
-
-        testRunner.assertValid();
-
-        testRunner.setVariable("contact.points", "localhost:9042");
-        testRunner.setVariable("user", "username");
-        testRunner.setVariable("pass", "password");
-
-        recordReader.addSchemaField("name", RecordFieldType.STRING);
-        recordReader.addSchemaField("age", RecordFieldType.INT);
-        recordReader.addSchemaField("sport", RecordFieldType.STRING);
-
-        recordReader.addRecord("John Doe", 48, "Soccer");
-        recordReader.addRecord("Jane Doe", 47, "Tennis");
-        recordReader.addRecord("Sally Doe", 47, "Curling");
-        recordReader.addRecord("Jimmy Doe", 14, null);
-        recordReader.addRecord("Pizza Doe", 14, null);
-
-        testRunner.enqueue("");
-        testRunner.run(1, true, true);
-        
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
-    }
-
-    private static class MockPutCassandraRecord extends PutCassandraRecord {
-        private Exception exceptionToThrow = null;
-        private Session mockSession = mock(Session.class);
-
-        @Override
-        protected Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
-                                        String username, String password) {
-            Cluster mockCluster = mock(Cluster.class);
-            try {
-                Metadata mockMetadata = mock(Metadata.class);
-                when(mockMetadata.getClusterName()).thenReturn("cluster1");
-                when(mockCluster.getMetadata()).thenReturn(mockMetadata);
-                when(mockCluster.connect()).thenReturn(mockSession);
-                when(mockCluster.connect(anyString())).thenReturn(mockSession);
-                Configuration config = Configuration.builder().build();
-                when(mockCluster.getConfiguration()).thenReturn(config);
-                ResultSetFuture future = mock(ResultSetFuture.class);
-                ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
-                PreparedStatement ps = mock(PreparedStatement.class);
-                when(mockSession.prepare(anyString())).thenReturn(ps);
-                BoundStatement bs = mock(BoundStatement.class);
-                when(ps.bind()).thenReturn(bs);
-                when(future.getUninterruptibly()).thenReturn(rs);
-                try {
-                    doReturn(rs).when(future).getUninterruptibly(anyLong(), 
any(TimeUnit.class));
-                } catch (TimeoutException te) {
-                    throw new IllegalArgumentException("Mocked cluster doesn't 
time out");
-                }
-                if (exceptionToThrow != null) {
-                    
doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString());
-                    
doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class));
-
-                } else {
-                    
when(mockSession.executeAsync(anyString())).thenReturn(future);
-                    
when(mockSession.executeAsync(any(Statement.class))).thenReturn(future);
-                }
-                when(mockSession.getCluster()).thenReturn(mockCluster);
-            } catch (Exception e) {
-                fail(e.getMessage());
-            }
-            return mockCluster;
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Configuration;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PutCassandraRecordTest {
+
+    private TestRunner testRunner;
+    private MockRecordParser recordReader;
+
+    @Before
+    public void setUp() throws Exception {
+        MockPutCassandraRecord processor = new MockPutCassandraRecord();
+        recordReader = new MockRecordParser();
+        testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, 
"reader");
+    }
+
+    @Test
+    public void testProcessorConfigValidity() throws InitializationException {
+        testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, 
"localhost:9042");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.PASSWORD, "password");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.USERNAME, "username");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
"LOGGED");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.KEYSPACE, "sampleks");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.TABLE, "sampletbl");
+        testRunner.assertNotValid();
+
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+        testRunner.assertValid();
+    }
+
+    private void setUpStandardTestConfig() throws InitializationException {
+        testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"localhost:9042");
+        testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, 
"password");
+        testRunner.setProperty(AbstractCassandraProcessor.USERNAME, 
"username");
+        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
"LOGGED");
+        testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+    }
+
+    @Test
+    public void testSimplePut() throws InitializationException {
+        setUpStandardTestConfig();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, "Soccer");
+        recordReader.addRecord("Jane Doe", 47, "Tennis");
+        recordReader.addRecord("Sally Doe", 47, "Curling");
+        recordReader.addRecord("Jimmy Doe", 14, null);
+        recordReader.addRecord("Pizza Doe", 14, null);
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testEL() throws InitializationException {
+        testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, 
"${contact.points}");
+        testRunner.setProperty(PutCassandraRecord.PASSWORD, "${pass}");
+        testRunner.setProperty(PutCassandraRecord.USERNAME, "${user}");
+        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
"LOGGED");
+        testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+
+        testRunner.assertValid();
+
+        testRunner.setVariable("contact.points", "localhost:9042");
+        testRunner.setVariable("user", "username");
+        testRunner.setVariable("pass", "password");
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, "Soccer");
+        recordReader.addRecord("Jane Doe", 47, "Tennis");
+        recordReader.addRecord("Sally Doe", 47, "Curling");
+        recordReader.addRecord("Jimmy Doe", 14, null);
+        recordReader.addRecord("Pizza Doe", 14, null);
+
+        testRunner.enqueue("");
+        testRunner.run(1, true, true);
+        
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
+    }
+
+    private static class MockPutCassandraRecord extends PutCassandraRecord {
+        private Exception exceptionToThrow = null;
+        private Session mockSession = mock(Session.class);
+
+        @Override
+        protected Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
+                                        String username, String password, 
String compressionType) {
+            Cluster mockCluster = mock(Cluster.class);
+            try {
+                Metadata mockMetadata = mock(Metadata.class);
+                when(mockMetadata.getClusterName()).thenReturn("cluster1");
+                when(mockCluster.getMetadata()).thenReturn(mockMetadata);
+                when(mockCluster.connect()).thenReturn(mockSession);
+                when(mockCluster.connect(anyString())).thenReturn(mockSession);
+                Configuration config = Configuration.builder().build();
+                when(mockCluster.getConfiguration()).thenReturn(config);
+                ResultSetFuture future = mock(ResultSetFuture.class);
+                ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
+                PreparedStatement ps = mock(PreparedStatement.class);
+                when(mockSession.prepare(anyString())).thenReturn(ps);
+                BoundStatement bs = mock(BoundStatement.class);
+                when(ps.bind()).thenReturn(bs);
+                when(future.getUninterruptibly()).thenReturn(rs);
+                try {
+                    doReturn(rs).when(future).getUninterruptibly(anyLong(), 
any(TimeUnit.class));
+                } catch (TimeoutException te) {
+                    throw new IllegalArgumentException("Mocked cluster doesn't 
time out");
+                }
+                if (exceptionToThrow != null) {
+                    
doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString());
+                    
doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class));
+
+                } else {
+                    
when(mockSession.executeAsync(anyString())).thenReturn(future);
+                    
when(mockSession.executeAsync(any(Statement.class))).thenReturn(future);
+                }
+                when(mockSession.getCluster()).thenReturn(mockCluster);
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+            return mockCluster;
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index dfec386..c116512 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -385,7 +385,7 @@ public class QueryCassandraTest {
 
         @Override
         protected Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
-                                        String username, String password) {
+                                        String username, String password, 
String compressionType) {
             Cluster mockCluster = mock(Cluster.class);
             try {
                 Metadata mockMetadata = mock(Metadata.class);
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
index 3a26677..079b570 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
@@ -1,285 +1,304 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.service;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.JdkSSLOptions;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.Session;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.authentication.exception.ProviderCreationException;
-import org.apache.nifi.cassandra.CassandraSessionProviderService;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.security.util.SslContextFactory;
-import org.apache.nifi.ssl.SSLContextService;
-
-import javax.net.ssl.SSLContext;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-@Tags({"cassandra", "dbcp", "database", "connection", "pooling"})
-@CapabilityDescription("Provides connection session for Cassandra processors 
to work with Apache Cassandra.")
-public class CassandraSessionProvider extends AbstractControllerService 
implements CassandraSessionProviderService {
-
-    public static final int DEFAULT_CASSANDRA_PORT = 9042;
-
-    // Common descriptors
-    public static final PropertyDescriptor CONTACT_POINTS = new 
PropertyDescriptor.Builder()
-            .name("Cassandra Contact Points")
-            .description("Contact points are addresses of Cassandra nodes. The 
list of contact points should be "
-                    + "comma-separated and in hostname:port format. Example 
node1:port,node2:port,...."
-                    + " The default client port for Cassandra is 9042, but the 
port(s) must be explicitly specified.")
-            .required(true)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor KEYSPACE = new 
PropertyDescriptor.Builder()
-            .name("Keyspace")
-            .description("The Cassandra Keyspace to connect to. If no keyspace 
is specified, the query will need to " +
-                    "include the keyspace name before any table reference, in 
case of 'query' native processors or " +
-                    "if the processor supports the 'Table' property, the 
keyspace name has to be provided with the " +
-                    "table name in the form of <KEYSPACE>.<TABLE>")
-            .required(false)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL "
-                    + "connections.")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
-
-    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
-            .name("Client Auth")
-            .description("Client authentication policy when connecting to 
secure (TLS/SSL) cluster. "
-                    + "Possible values are REQUIRED, WANT, NONE. This property 
is only used when an SSL Context "
-                    + "has been defined and enabled.")
-            .required(false)
-            .allowableValues(SSLContextService.ClientAuth.values())
-            .defaultValue("REQUIRED")
-            .build();
-
-    public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
-            .name("Username")
-            .description("Username to access the Cassandra cluster")
-            .required(false)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
-            .name("Password")
-            .description("Password to access the Cassandra cluster")
-            .required(false)
-            .sensitive(true)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor CONSISTENCY_LEVEL = new 
PropertyDescriptor.Builder()
-            .name("Consistency Level")
-            .description("The strategy for how many replicas must respond 
before results are returned.")
-            .required(true)
-            .allowableValues(ConsistencyLevel.values())
-            .defaultValue("ONE")
-            .build();
-
-    private List<PropertyDescriptor> properties;
-    private Cluster cluster;
-    private Session cassandraSession;
-
-    @Override
-    public void init(final ControllerServiceInitializationContext context) {
-        List<PropertyDescriptor> props = new ArrayList<>();
-
-        props.add(CONTACT_POINTS);
-        props.add(CLIENT_AUTH);
-        props.add(CONSISTENCY_LEVEL);
-        props.add(KEYSPACE);
-        props.add(USERNAME);
-        props.add(PASSWORD);
-        props.add(PROP_SSL_CONTEXT_SERVICE);
-
-        properties = props;
-    }
-
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @OnEnabled
-    public void onEnabled(final ConfigurationContext context) {
-        connectToCassandra(context);
-    }
-
-    @OnDisabled
-    public void onDisabled(){
-        if (cassandraSession != null) {
-            cassandraSession.close();
-        }
-        if (cluster != null) {
-            cluster.close();
-        }
-    }
-
-    @OnStopped
-    public void onStopped() {
-        if (cassandraSession != null) {
-            cassandraSession.close();
-        }
-        if (cluster != null) {
-            cluster.close();
-        }
-    }
-
-    @Override
-    public Cluster getCluster() {
-        if (cluster != null) {
-            return cluster;
-        } else {
-            throw new ProcessException("Unable to get the Cassandra cluster 
detail.");
-        }
-    }
-
-    @Override
-    public Session getCassandraSession() {
-        if (cassandraSession != null) {
-            return cassandraSession;
-        } else {
-            throw new ProcessException("Unable to get the Cassandra session.");
-        }
-    }
-
-    private void connectToCassandra(ConfigurationContext context) {
-        if (cluster == null) {
-            ComponentLog log = getLogger();
-            final String contactPointList = 
context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
-            final String consistencyLevel = 
context.getProperty(CONSISTENCY_LEVEL).getValue();
-            List<InetSocketAddress> contactPoints = 
getContactPoints(contactPointList);
-
-            // Set up the client for secure (SSL/TLS communications) if 
configured to do so
-            final SSLContextService sslService =
-                    
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-            final String rawClientAuth = 
context.getProperty(CLIENT_AUTH).getValue();
-            final SSLContext sslContext;
-
-            if (sslService != null) {
-                final SSLContextService.ClientAuth clientAuth;
-                if (StringUtils.isBlank(rawClientAuth)) {
-                    clientAuth = SSLContextService.ClientAuth.REQUIRED;
-                } else {
-                    try {
-                        clientAuth = 
SSLContextService.ClientAuth.valueOf(rawClientAuth);
-                    } catch (final IllegalArgumentException iae) {
-                        throw new 
ProviderCreationException(String.format("Unrecognized client auth '%s'. 
Possible values are [%s]",
-                                rawClientAuth, 
StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
-                    }
-                }
-                sslContext = sslService.createSSLContext(clientAuth);
-            } else {
-                sslContext = null;
-            }
-
-            final String username, password;
-            PropertyValue usernameProperty = 
context.getProperty(USERNAME).evaluateAttributeExpressions();
-            PropertyValue passwordProperty = 
context.getProperty(PASSWORD).evaluateAttributeExpressions();
-
-            if (usernameProperty != null && passwordProperty != null) {
-                username = usernameProperty.getValue();
-                password = passwordProperty.getValue();
-            } else {
-                username = null;
-                password = null;
-            }
-
-            // Create the cluster and connect to it
-            Cluster newCluster = createCluster(contactPoints, sslContext, 
username, password);
-            PropertyValue keyspaceProperty = 
context.getProperty(KEYSPACE).evaluateAttributeExpressions();
-            final Session newSession;
-            if (keyspaceProperty != null) {
-                newSession = newCluster.connect(keyspaceProperty.getValue());
-            } else {
-                newSession = newCluster.connect();
-            }
-            
newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel));
-            Metadata metadata = newCluster.getMetadata();
-            log.info("Connected to Cassandra cluster: {}", new 
Object[]{metadata.getClusterName()});
-
-            cluster = newCluster;
-            cassandraSession = newSession;
-        }
-    }
-
-    private List<InetSocketAddress> getContactPoints(String contactPointList) {
-
-        if (contactPointList == null) {
-            return null;
-        }
-
-        final List<String> contactPointStringList = 
Arrays.asList(contactPointList.split(","));
-        List<InetSocketAddress> contactPoints = new ArrayList<>();
-
-        for (String contactPointEntry : contactPointStringList) {
-            String[] addresses = contactPointEntry.split(":");
-            final String hostName = addresses[0].trim();
-            final int port = (addresses.length > 1) ? 
Integer.parseInt(addresses[1].trim()) : DEFAULT_CASSANDRA_PORT;
-
-            contactPoints.add(new InetSocketAddress(hostName, port));
-        }
-
-        return contactPoints;
-    }
-
-    private Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
-                                  String username, String password) {
-        Cluster.Builder builder = 
Cluster.builder().addContactPointsWithPorts(contactPoints);
-
-        if (sslContext != null) {
-            JdkSSLOptions sslOptions = JdkSSLOptions.builder()
-                    .withSSLContext(sslContext)
-                    .build();
-            builder = builder.withSSL(sslOptions);
-        }
-
-        if (username != null && password != null) {
-            builder = builder.withCredentials(username, password);
-        }
-
-        return builder.build();
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.service;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.JdkSSLOptions;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.Session;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.authentication.exception.ProviderCreationException;
+import org.apache.nifi.cassandra.CassandraSessionProviderService;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"cassandra", "dbcp", "database", "connection", "pooling"})
+@CapabilityDescription("Provides connection session for Cassandra processors 
to work with Apache Cassandra.")
+public class CassandraSessionProvider extends AbstractControllerService 
implements CassandraSessionProviderService {
+
+    public static final int DEFAULT_CASSANDRA_PORT = 9042;
+
+    // Common descriptors
+    public static final PropertyDescriptor CONTACT_POINTS = new 
PropertyDescriptor.Builder()
+            .name("Cassandra Contact Points")
+            .description("Contact points are addresses of Cassandra nodes. The 
list of contact points should be "
+                    + "comma-separated and in hostname:port format. Example 
node1:port,node2:port,...."
+                    + " The default client port for Cassandra is 9042, but the 
port(s) must be explicitly specified.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor KEYSPACE = new 
PropertyDescriptor.Builder()
+            .name("Keyspace")
+            .description("The Cassandra Keyspace to connect to. If no keyspace 
is specified, the query will need to " +
+                    "include the keyspace name before any table reference, in 
case of 'query' native processors or " +
+                    "if the processor supports the 'Table' property, the 
keyspace name has to be provided with the " +
+                    "table name in the form of <KEYSPACE>.<TABLE>")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL "
+                    + "connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
+            .name("Client Auth")
+            .description("Client authentication policy when connecting to 
secure (TLS/SSL) cluster. "
+                    + "Possible values are REQUIRED, WANT, NONE. This property 
is only used when an SSL Context "
+                    + "has been defined and enabled.")
+            .required(false)
+            .allowableValues(SSLContextService.ClientAuth.values())
+            .defaultValue("REQUIRED")
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+            .name("Username")
+            .description("Username to access the Cassandra cluster")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("Password")
+            .description("Password to access the Cassandra cluster")
+            .required(false)
+            .sensitive(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CONSISTENCY_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("Consistency Level")
+            .description("The strategy for how many replicas must respond 
before results are returned.")
+            .required(true)
+            .allowableValues(ConsistencyLevel.values())
+            .defaultValue("ONE")
+            .build();
+
+    static final PropertyDescriptor COMPRESSION_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Compression Type")
+            .description("Enable compression at transport-level requests and 
responses")
+            .required(false)
+            .allowableValues(ProtocolOptions.Compression.values())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("NONE")
+            .build();
+
+    private List<PropertyDescriptor> properties;
+    private Cluster cluster;
+    private Session cassandraSession;
+
+    @Override
+    public void init(final ControllerServiceInitializationContext context) {
+        List<PropertyDescriptor> props = new ArrayList<>();
+
+        props.add(CONTACT_POINTS);
+        props.add(CLIENT_AUTH);
+        props.add(CONSISTENCY_LEVEL);
+        props.add(COMPRESSION_TYPE);
+        props.add(KEYSPACE);
+        props.add(USERNAME);
+        props.add(PASSWORD);
+        props.add(PROP_SSL_CONTEXT_SERVICE);
+
+        properties = props;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        connectToCassandra(context);
+    }
+
+    @OnDisabled
+    public void onDisabled(){
+        if (cassandraSession != null) {
+            cassandraSession.close();
+        }
+        if (cluster != null) {
+            cluster.close();
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        if (cassandraSession != null) {
+            cassandraSession.close();
+        }
+        if (cluster != null) {
+            cluster.close();
+        }
+    }
+
+    @Override
+    public Cluster getCluster() {
+        if (cluster != null) {
+            return cluster;
+        } else {
+            throw new ProcessException("Unable to get the Cassandra cluster 
detail.");
+        }
+    }
+
+    @Override
+    public Session getCassandraSession() {
+        if (cassandraSession != null) {
+            return cassandraSession;
+        } else {
+            throw new ProcessException("Unable to get the Cassandra session.");
+        }
+    }
+
+    private void connectToCassandra(ConfigurationContext context) {
+        if (cluster == null) {
+            ComponentLog log = getLogger();
+            final String contactPointList = 
context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
+            final String consistencyLevel = 
context.getProperty(CONSISTENCY_LEVEL).getValue();
+            final String compressionType = 
context.getProperty(COMPRESSION_TYPE).getValue();
+
+            List<InetSocketAddress> contactPoints = 
getContactPoints(contactPointList);
+
+            // Set up the client for secure (SSL/TLS communications) if 
configured to do so
+            final SSLContextService sslService =
+                    
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+            final String rawClientAuth = 
context.getProperty(CLIENT_AUTH).getValue();
+            final SSLContext sslContext;
+
+            if (sslService != null) {
+                final SSLContextService.ClientAuth clientAuth;
+                if (StringUtils.isBlank(rawClientAuth)) {
+                    clientAuth = SSLContextService.ClientAuth.REQUIRED;
+                } else {
+                    try {
+                        clientAuth = 
SSLContextService.ClientAuth.valueOf(rawClientAuth);
+                    } catch (final IllegalArgumentException iae) {
+                        throw new 
ProviderCreationException(String.format("Unrecognized client auth '%s'. 
Possible values are [%s]",
+                                rawClientAuth, 
StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+                    }
+                }
+                sslContext = sslService.createSSLContext(clientAuth);
+            } else {
+                sslContext = null;
+            }
+
+            final String username, password;
+            PropertyValue usernameProperty = 
context.getProperty(USERNAME).evaluateAttributeExpressions();
+            PropertyValue passwordProperty = 
context.getProperty(PASSWORD).evaluateAttributeExpressions();
+
+            if (usernameProperty != null && passwordProperty != null) {
+                username = usernameProperty.getValue();
+                password = passwordProperty.getValue();
+            } else {
+                username = null;
+                password = null;
+            }
+
+            // Create the cluster and connect to it
+            Cluster newCluster = createCluster(contactPoints, sslContext, 
username, password, compressionType);
+            PropertyValue keyspaceProperty = 
context.getProperty(KEYSPACE).evaluateAttributeExpressions();
+            final Session newSession;
+            if (keyspaceProperty != null) {
+                newSession = newCluster.connect(keyspaceProperty.getValue());
+            } else {
+                newSession = newCluster.connect();
+            }
+            
newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel));
+            Metadata metadata = newCluster.getMetadata();
+            log.info("Connected to Cassandra cluster: {}", new 
Object[]{metadata.getClusterName()});
+
+            cluster = newCluster;
+            cassandraSession = newSession;
+        }
+    }
+
+    private List<InetSocketAddress> getContactPoints(String contactPointList) {
+
+        if (contactPointList == null) {
+            return null;
+        }
+
+        final List<String> contactPointStringList = 
Arrays.asList(contactPointList.split(","));
+        List<InetSocketAddress> contactPoints = new ArrayList<>();
+
+        for (String contactPointEntry : contactPointStringList) {
+            String[] addresses = contactPointEntry.split(":");
+            final String hostName = addresses[0].trim();
+            final int port = (addresses.length > 1) ? 
Integer.parseInt(addresses[1].trim()) : DEFAULT_CASSANDRA_PORT;
+
+            contactPoints.add(new InetSocketAddress(hostName, port));
+        }
+
+        return contactPoints;
+    }
+
+    private Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
+                                  String username, String password, String 
compressionType) {
+        Cluster.Builder builder = 
Cluster.builder().addContactPointsWithPorts(contactPoints);
+
+        if (sslContext != null) {
+            JdkSSLOptions sslOptions = JdkSSLOptions.builder()
+                    .withSSLContext(sslContext)
+                    .build();
+            builder = builder.withSSL(sslOptions);
+        }
+
+        if (username != null && password != null) {
+            builder = builder.withCredentials(username, password);
+        }
+
+        if(ProtocolOptions.Compression.SNAPPY.equals(compressionType)) {
+            builder = 
builder.withCompression(ProtocolOptions.Compression.SNAPPY);
+        } else if(ProtocolOptions.Compression.LZ4.equals(compressionType)) {
+            builder = builder.withCompression(ProtocolOptions.Compression.LZ4);
+        }
+
+        return builder.build();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
index 2b688bc..2f49d02 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
@@ -1,59 +1,59 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.service;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestCassandraSessionProvider {
-
-    private static TestRunner runner;
-    private static CassandraSessionProvider sessionProvider;
-
-    @BeforeClass
-    public static void setup() throws InitializationException {
-        MockCassandraProcessor mockCassandraProcessor = new 
MockCassandraProcessor();
-        sessionProvider = new CassandraSessionProvider();
-
-        runner = TestRunners.newTestRunner(mockCassandraProcessor);
-        runner.addControllerService("cassandra-session-provider", 
sessionProvider);
-    }
-
-    @Test
-    public void testGetPropertyDescriptors() {
-        List<PropertyDescriptor> properties = 
sessionProvider.getPropertyDescriptors();
-
-        assertEquals(7, properties.size());
-        assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH));
-        
assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL));
-        
assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));
-        assertTrue(properties.contains(CassandraSessionProvider.KEYSPACE));
-        assertTrue(properties.contains(CassandraSessionProvider.PASSWORD));
-        
assertTrue(properties.contains(CassandraSessionProvider.PROP_SSL_CONTEXT_SERVICE));
-        assertTrue(properties.contains(CassandraSessionProvider.USERNAME));
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.service;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestCassandraSessionProvider {
+
+    private static TestRunner runner;
+    private static CassandraSessionProvider sessionProvider;
+
+    @BeforeClass
+    public static void setup() throws InitializationException {
+        MockCassandraProcessor mockCassandraProcessor = new 
MockCassandraProcessor();
+        sessionProvider = new CassandraSessionProvider();
+
+        runner = TestRunners.newTestRunner(mockCassandraProcessor);
+        runner.addControllerService("cassandra-session-provider", 
sessionProvider);
+    }
+
+    @Test
+    public void testGetPropertyDescriptors() {
+        List<PropertyDescriptor> properties = 
sessionProvider.getPropertyDescriptors();
+
+        assertEquals(8, properties.size());
+        assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH));
+        
assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL));
+        
assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));
+        assertTrue(properties.contains(CassandraSessionProvider.KEYSPACE));
+        assertTrue(properties.contains(CassandraSessionProvider.PASSWORD));
+        
assertTrue(properties.contains(CassandraSessionProvider.PROP_SSL_CONTEXT_SERVICE));
+        assertTrue(properties.contains(CassandraSessionProvider.USERNAME));
+    }
+
+}

Reply via email to