This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 0e10e41 NIFI-4358 This closes #3363. cassandra connection enable
compression at resquest and response
0e10e41 is described below
commit 0e10e417df4d63f3815874fbbf11fc3a450e67af
Author: Sandish Kumar <[email protected]>
AuthorDate: Mon Mar 11 00:10:09 2019 -0500
NIFI-4358 This closes #3363. cassandra connection enable compression at
resquest and response
Signed-off-by: Joe Witt <[email protected]>
---
.../cassandra/AbstractCassandraProcessor.java | 22 +-
.../cassandra/AbstractCassandraProcessorTest.java | 2 +-
.../processors/cassandra/PutCassandraQLTest.java | 2 +-
.../cassandra/PutCassandraRecordTest.java | 396 +++++++-------
.../processors/cassandra/QueryCassandraTest.java | 2 +-
.../nifi/service/CassandraSessionProvider.java | 589 +++++++++++----------
.../nifi/service/TestCassandraSessionProvider.java | 118 ++---
7 files changed, 584 insertions(+), 547 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
index 2e2b8a8..2c96671 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
@@ -22,6 +22,7 @@ import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
@@ -137,6 +138,15 @@ public abstract class AbstractCassandraProcessor extends
AbstractProcessor {
.defaultValue("ONE")
.build();
+ static final PropertyDescriptor COMPRESSION_TYPE = new
PropertyDescriptor.Builder()
+ .name("Compression Type")
+ .description("Enable compression at transport-level requests and
responses")
+ .required(false)
+ .allowableValues(ProtocolOptions.Compression.values())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("NONE")
+ .build();
+
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the record data.")
@@ -172,6 +182,7 @@ public abstract class AbstractCassandraProcessor extends
AbstractProcessor {
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONSISTENCY_LEVEL);
+ descriptors.add(COMPRESSION_TYPE);
descriptors.add(CHARSET);
}
@@ -238,6 +249,7 @@ public abstract class AbstractCassandraProcessor extends
AbstractProcessor {
ComponentLog log = getLogger();
final String contactPointList =
context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
final String consistencyLevel =
context.getProperty(CONSISTENCY_LEVEL).getValue();
+ final String compressionType =
context.getProperty(COMPRESSION_TYPE).getValue();
List<InetSocketAddress> contactPoints =
getContactPoints(contactPointList);
// Set up the client for secure (SSL/TLS communications) if
configured to do so
@@ -277,7 +289,7 @@ public abstract class AbstractCassandraProcessor extends
AbstractProcessor {
}
// Create the cluster and connect to it
- Cluster newCluster = createCluster(contactPoints, sslContext,
username, password);
+ Cluster newCluster = createCluster(contactPoints, sslContext,
username, password, compressionType);
PropertyValue keyspaceProperty =
context.getProperty(KEYSPACE).evaluateAttributeExpressions();
final Session newSession;
@@ -304,16 +316,22 @@ public abstract class AbstractCassandraProcessor extends
AbstractProcessor {
* @param sslContext The SSL context (used for secure connections)
* @param username The username for connection authentication
* @param password The password for connection authentication
+ * @param compressionType Enable compression at transport-level requests
and responses.
* @return A reference to the Cluster object associated with the given
Cassandra configuration
*/
protected Cluster createCluster(List<InetSocketAddress> contactPoints,
SSLContext sslContext,
- String username, String password) {
+ String username, String password, String
compressionType) {
Cluster.Builder builder =
Cluster.builder().addContactPointsWithPorts(contactPoints);
if (sslContext != null) {
JdkSSLOptions sslOptions = JdkSSLOptions.builder()
.withSSLContext(sslContext)
.build();
builder = builder.withSSL(sslOptions);
+ if(ProtocolOptions.Compression.SNAPPY.equals(compressionType)) {
+ builder =
builder.withCompression(ProtocolOptions.Compression.SNAPPY);
+ } else if(ProtocolOptions.Compression.LZ4.equals(compressionType))
{
+ builder =
builder.withCompression(ProtocolOptions.Compression.LZ4);
+ }
}
if (username != null && password != null) {
builder = builder.withCredentials(username, password);
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
index 4401cd9..c8303e4 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
@@ -303,7 +303,7 @@ public class AbstractCassandraProcessorTest {
@Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints,
SSLContext sslContext,
- String username, String password) {
+ String username, String password,
String compressionType) {
Cluster mockCluster = mock(Cluster.class);
Metadata mockMetadata = mock(Metadata.class);
when(mockMetadata.getClusterName()).thenReturn("cluster1");
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
index 60c3cda..7235e81 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
@@ -314,7 +314,7 @@ public class PutCassandraQLTest {
@Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints,
SSLContext sslContext,
- String username, String password) {
+ String username, String password,
String compressionType) {
Cluster mockCluster = mock(Cluster.class);
try {
Metadata mockMetadata = mock(Metadata.class);
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
index 88a9b5d..32a1145 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
@@ -1,198 +1,198 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.cassandra;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Configuration;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.record.MockRecordParser;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-
-import javax.net.ssl.SSLContext;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class PutCassandraRecordTest {
-
- private TestRunner testRunner;
- private MockRecordParser recordReader;
-
- @Before
- public void setUp() throws Exception {
- MockPutCassandraRecord processor = new MockPutCassandraRecord();
- recordReader = new MockRecordParser();
- testRunner = TestRunners.newTestRunner(processor);
- testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY,
"reader");
- }
-
- @Test
- public void testProcessorConfigValidity() throws InitializationException {
- testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS,
"localhost:9042");
- testRunner.assertNotValid();
-
- testRunner.setProperty(PutCassandraRecord.PASSWORD, "password");
- testRunner.assertNotValid();
-
- testRunner.setProperty(PutCassandraRecord.USERNAME, "username");
- testRunner.assertNotValid();
-
- testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
- testRunner.assertNotValid();
-
- testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE,
"LOGGED");
- testRunner.assertNotValid();
-
- testRunner.setProperty(PutCassandraRecord.KEYSPACE, "sampleks");
- testRunner.assertNotValid();
-
- testRunner.setProperty(PutCassandraRecord.TABLE, "sampletbl");
- testRunner.assertNotValid();
-
- testRunner.addControllerService("reader", recordReader);
- testRunner.enableControllerService(recordReader);
- testRunner.assertValid();
- }
-
- private void setUpStandardTestConfig() throws InitializationException {
- testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS,
"localhost:9042");
- testRunner.setProperty(AbstractCassandraProcessor.PASSWORD,
"password");
- testRunner.setProperty(AbstractCassandraProcessor.USERNAME,
"username");
- testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
- testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE,
"LOGGED");
- testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
- testRunner.addControllerService("reader", recordReader);
- testRunner.enableControllerService(recordReader);
- }
-
- @Test
- public void testSimplePut() throws InitializationException {
- setUpStandardTestConfig();
-
- recordReader.addSchemaField("name", RecordFieldType.STRING);
- recordReader.addSchemaField("age", RecordFieldType.INT);
- recordReader.addSchemaField("sport", RecordFieldType.STRING);
-
- recordReader.addRecord("John Doe", 48, "Soccer");
- recordReader.addRecord("Jane Doe", 47, "Tennis");
- recordReader.addRecord("Sally Doe", 47, "Curling");
- recordReader.addRecord("Jimmy Doe", 14, null);
- recordReader.addRecord("Pizza Doe", 14, null);
-
- testRunner.enqueue("");
- testRunner.run();
-
-
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
- }
-
- @Test
- public void testEL() throws InitializationException {
- testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS,
"${contact.points}");
- testRunner.setProperty(PutCassandraRecord.PASSWORD, "${pass}");
- testRunner.setProperty(PutCassandraRecord.USERNAME, "${user}");
- testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
- testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE,
"LOGGED");
- testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
- testRunner.addControllerService("reader", recordReader);
- testRunner.enableControllerService(recordReader);
-
- testRunner.assertValid();
-
- testRunner.setVariable("contact.points", "localhost:9042");
- testRunner.setVariable("user", "username");
- testRunner.setVariable("pass", "password");
-
- recordReader.addSchemaField("name", RecordFieldType.STRING);
- recordReader.addSchemaField("age", RecordFieldType.INT);
- recordReader.addSchemaField("sport", RecordFieldType.STRING);
-
- recordReader.addRecord("John Doe", 48, "Soccer");
- recordReader.addRecord("Jane Doe", 47, "Tennis");
- recordReader.addRecord("Sally Doe", 47, "Curling");
- recordReader.addRecord("Jimmy Doe", 14, null);
- recordReader.addRecord("Pizza Doe", 14, null);
-
- testRunner.enqueue("");
- testRunner.run(1, true, true);
-
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
- }
-
- private static class MockPutCassandraRecord extends PutCassandraRecord {
- private Exception exceptionToThrow = null;
- private Session mockSession = mock(Session.class);
-
- @Override
- protected Cluster createCluster(List<InetSocketAddress> contactPoints,
SSLContext sslContext,
- String username, String password) {
- Cluster mockCluster = mock(Cluster.class);
- try {
- Metadata mockMetadata = mock(Metadata.class);
- when(mockMetadata.getClusterName()).thenReturn("cluster1");
- when(mockCluster.getMetadata()).thenReturn(mockMetadata);
- when(mockCluster.connect()).thenReturn(mockSession);
- when(mockCluster.connect(anyString())).thenReturn(mockSession);
- Configuration config = Configuration.builder().build();
- when(mockCluster.getConfiguration()).thenReturn(config);
- ResultSetFuture future = mock(ResultSetFuture.class);
- ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
- PreparedStatement ps = mock(PreparedStatement.class);
- when(mockSession.prepare(anyString())).thenReturn(ps);
- BoundStatement bs = mock(BoundStatement.class);
- when(ps.bind()).thenReturn(bs);
- when(future.getUninterruptibly()).thenReturn(rs);
- try {
- doReturn(rs).when(future).getUninterruptibly(anyLong(),
any(TimeUnit.class));
- } catch (TimeoutException te) {
- throw new IllegalArgumentException("Mocked cluster doesn't
time out");
- }
- if (exceptionToThrow != null) {
-
doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString());
-
doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class));
-
- } else {
-
when(mockSession.executeAsync(anyString())).thenReturn(future);
-
when(mockSession.executeAsync(any(Statement.class))).thenReturn(future);
- }
- when(mockSession.getCluster()).thenReturn(mockCluster);
- } catch (Exception e) {
- fail(e.getMessage());
- }
- return mockCluster;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Configuration;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PutCassandraRecordTest {
+
+ private TestRunner testRunner;
+ private MockRecordParser recordReader;
+
+ @Before
+ public void setUp() throws Exception {
+ MockPutCassandraRecord processor = new MockPutCassandraRecord();
+ recordReader = new MockRecordParser();
+ testRunner = TestRunners.newTestRunner(processor);
+ testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY,
"reader");
+ }
+
+ @Test
+ public void testProcessorConfigValidity() throws InitializationException {
+ testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS,
"localhost:9042");
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(PutCassandraRecord.PASSWORD, "password");
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(PutCassandraRecord.USERNAME, "username");
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE,
"LOGGED");
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(PutCassandraRecord.KEYSPACE, "sampleks");
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(PutCassandraRecord.TABLE, "sampletbl");
+ testRunner.assertNotValid();
+
+ testRunner.addControllerService("reader", recordReader);
+ testRunner.enableControllerService(recordReader);
+ testRunner.assertValid();
+ }
+
+ private void setUpStandardTestConfig() throws InitializationException {
+ testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS,
"localhost:9042");
+ testRunner.setProperty(AbstractCassandraProcessor.PASSWORD,
"password");
+ testRunner.setProperty(AbstractCassandraProcessor.USERNAME,
"username");
+ testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+ testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE,
"LOGGED");
+ testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
+ testRunner.addControllerService("reader", recordReader);
+ testRunner.enableControllerService(recordReader);
+ }
+
+ @Test
+ public void testSimplePut() throws InitializationException {
+ setUpStandardTestConfig();
+
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+ recordReader.addRecord("John Doe", 48, "Soccer");
+ recordReader.addRecord("Jane Doe", 47, "Tennis");
+ recordReader.addRecord("Sally Doe", 47, "Curling");
+ recordReader.addRecord("Jimmy Doe", 14, null);
+ recordReader.addRecord("Pizza Doe", 14, null);
+
+ testRunner.enqueue("");
+ testRunner.run();
+
+
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testEL() throws InitializationException {
+ testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS,
"${contact.points}");
+ testRunner.setProperty(PutCassandraRecord.PASSWORD, "${pass}");
+ testRunner.setProperty(PutCassandraRecord.USERNAME, "${user}");
+ testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+ testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE,
"LOGGED");
+ testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
+ testRunner.addControllerService("reader", recordReader);
+ testRunner.enableControllerService(recordReader);
+
+ testRunner.assertValid();
+
+ testRunner.setVariable("contact.points", "localhost:9042");
+ testRunner.setVariable("user", "username");
+ testRunner.setVariable("pass", "password");
+
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+ recordReader.addRecord("John Doe", 48, "Soccer");
+ recordReader.addRecord("Jane Doe", 47, "Tennis");
+ recordReader.addRecord("Sally Doe", 47, "Curling");
+ recordReader.addRecord("Jimmy Doe", 14, null);
+ recordReader.addRecord("Pizza Doe", 14, null);
+
+ testRunner.enqueue("");
+ testRunner.run(1, true, true);
+
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
+ }
+
+ private static class MockPutCassandraRecord extends PutCassandraRecord {
+ private Exception exceptionToThrow = null;
+ private Session mockSession = mock(Session.class);
+
+ @Override
+ protected Cluster createCluster(List<InetSocketAddress> contactPoints,
SSLContext sslContext,
+ String username, String password,
String compressionType) {
+ Cluster mockCluster = mock(Cluster.class);
+ try {
+ Metadata mockMetadata = mock(Metadata.class);
+ when(mockMetadata.getClusterName()).thenReturn("cluster1");
+ when(mockCluster.getMetadata()).thenReturn(mockMetadata);
+ when(mockCluster.connect()).thenReturn(mockSession);
+ when(mockCluster.connect(anyString())).thenReturn(mockSession);
+ Configuration config = Configuration.builder().build();
+ when(mockCluster.getConfiguration()).thenReturn(config);
+ ResultSetFuture future = mock(ResultSetFuture.class);
+ ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
+ PreparedStatement ps = mock(PreparedStatement.class);
+ when(mockSession.prepare(anyString())).thenReturn(ps);
+ BoundStatement bs = mock(BoundStatement.class);
+ when(ps.bind()).thenReturn(bs);
+ when(future.getUninterruptibly()).thenReturn(rs);
+ try {
+ doReturn(rs).when(future).getUninterruptibly(anyLong(),
any(TimeUnit.class));
+ } catch (TimeoutException te) {
+ throw new IllegalArgumentException("Mocked cluster doesn't
time out");
+ }
+ if (exceptionToThrow != null) {
+
doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString());
+
doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class));
+
+ } else {
+
when(mockSession.executeAsync(anyString())).thenReturn(future);
+
when(mockSession.executeAsync(any(Statement.class))).thenReturn(future);
+ }
+ when(mockSession.getCluster()).thenReturn(mockCluster);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ return mockCluster;
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index dfec386..c116512 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -385,7 +385,7 @@ public class QueryCassandraTest {
@Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints,
SSLContext sslContext,
- String username, String password) {
+ String username, String password,
String compressionType) {
Cluster mockCluster = mock(Cluster.class);
try {
Metadata mockMetadata = mock(Metadata.class);
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
index 3a26677..079b570 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
@@ -1,285 +1,304 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.service;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.JdkSSLOptions;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.Session;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.authentication.exception.ProviderCreationException;
-import org.apache.nifi.cassandra.CassandraSessionProviderService;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.security.util.SslContextFactory;
-import org.apache.nifi.ssl.SSLContextService;
-
-import javax.net.ssl.SSLContext;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-@Tags({"cassandra", "dbcp", "database", "connection", "pooling"})
-@CapabilityDescription("Provides connection session for Cassandra processors
to work with Apache Cassandra.")
-public class CassandraSessionProvider extends AbstractControllerService
implements CassandraSessionProviderService {
-
- public static final int DEFAULT_CASSANDRA_PORT = 9042;
-
- // Common descriptors
- public static final PropertyDescriptor CONTACT_POINTS = new
PropertyDescriptor.Builder()
- .name("Cassandra Contact Points")
- .description("Contact points are addresses of Cassandra nodes. The
list of contact points should be "
- + "comma-separated and in hostname:port format. Example
node1:port,node2:port,...."
- + " The default client port for Cassandra is 9042, but the
port(s) must be explicitly specified.")
- .required(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor KEYSPACE = new
PropertyDescriptor.Builder()
- .name("Keyspace")
- .description("The Cassandra Keyspace to connect to. If no keyspace
is specified, the query will need to " +
- "include the keyspace name before any table reference, in
case of 'query' native processors or " +
- "if the processor supports the 'Table' property, the
keyspace name has to be provided with the " +
- "table name in the form of <KEYSPACE>.<TABLE>")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description("The SSL Context Service used to provide client
certificate information for TLS/SSL "
- + "connections.")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
-
- public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
- .name("Client Auth")
- .description("Client authentication policy when connecting to
secure (TLS/SSL) cluster. "
- + "Possible values are REQUIRED, WANT, NONE. This property
is only used when an SSL Context "
- + "has been defined and enabled.")
- .required(false)
- .allowableValues(SSLContextService.ClientAuth.values())
- .defaultValue("REQUIRED")
- .build();
-
- public static final PropertyDescriptor USERNAME = new
PropertyDescriptor.Builder()
- .name("Username")
- .description("Username to access the Cassandra cluster")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor PASSWORD = new
PropertyDescriptor.Builder()
- .name("Password")
- .description("Password to access the Cassandra cluster")
- .required(false)
- .sensitive(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor CONSISTENCY_LEVEL = new
PropertyDescriptor.Builder()
- .name("Consistency Level")
- .description("The strategy for how many replicas must respond
before results are returned.")
- .required(true)
- .allowableValues(ConsistencyLevel.values())
- .defaultValue("ONE")
- .build();
-
- private List<PropertyDescriptor> properties;
- private Cluster cluster;
- private Session cassandraSession;
-
- @Override
- public void init(final ControllerServiceInitializationContext context) {
- List<PropertyDescriptor> props = new ArrayList<>();
-
- props.add(CONTACT_POINTS);
- props.add(CLIENT_AUTH);
- props.add(CONSISTENCY_LEVEL);
- props.add(KEYSPACE);
- props.add(USERNAME);
- props.add(PASSWORD);
- props.add(PROP_SSL_CONTEXT_SERVICE);
-
- properties = props;
- }
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @OnEnabled
- public void onEnabled(final ConfigurationContext context) {
- connectToCassandra(context);
- }
-
- @OnDisabled
- public void onDisabled(){
- if (cassandraSession != null) {
- cassandraSession.close();
- }
- if (cluster != null) {
- cluster.close();
- }
- }
-
- @OnStopped
- public void onStopped() {
- if (cassandraSession != null) {
- cassandraSession.close();
- }
- if (cluster != null) {
- cluster.close();
- }
- }
-
- @Override
- public Cluster getCluster() {
- if (cluster != null) {
- return cluster;
- } else {
- throw new ProcessException("Unable to get the Cassandra cluster
detail.");
- }
- }
-
- @Override
- public Session getCassandraSession() {
- if (cassandraSession != null) {
- return cassandraSession;
- } else {
- throw new ProcessException("Unable to get the Cassandra session.");
- }
- }
-
- private void connectToCassandra(ConfigurationContext context) {
- if (cluster == null) {
- ComponentLog log = getLogger();
- final String contactPointList =
context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
- final String consistencyLevel =
context.getProperty(CONSISTENCY_LEVEL).getValue();
- List<InetSocketAddress> contactPoints =
getContactPoints(contactPointList);
-
- // Set up the client for secure (SSL/TLS communications) if
configured to do so
- final SSLContextService sslService =
-
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- final String rawClientAuth =
context.getProperty(CLIENT_AUTH).getValue();
- final SSLContext sslContext;
-
- if (sslService != null) {
- final SSLContextService.ClientAuth clientAuth;
- if (StringUtils.isBlank(rawClientAuth)) {
- clientAuth = SSLContextService.ClientAuth.REQUIRED;
- } else {
- try {
- clientAuth =
SSLContextService.ClientAuth.valueOf(rawClientAuth);
- } catch (final IllegalArgumentException iae) {
- throw new
ProviderCreationException(String.format("Unrecognized client auth '%s'.
Possible values are [%s]",
- rawClientAuth,
StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
- }
- }
- sslContext = sslService.createSSLContext(clientAuth);
- } else {
- sslContext = null;
- }
-
- final String username, password;
- PropertyValue usernameProperty =
context.getProperty(USERNAME).evaluateAttributeExpressions();
- PropertyValue passwordProperty =
context.getProperty(PASSWORD).evaluateAttributeExpressions();
-
- if (usernameProperty != null && passwordProperty != null) {
- username = usernameProperty.getValue();
- password = passwordProperty.getValue();
- } else {
- username = null;
- password = null;
- }
-
- // Create the cluster and connect to it
- Cluster newCluster = createCluster(contactPoints, sslContext,
username, password);
- PropertyValue keyspaceProperty =
context.getProperty(KEYSPACE).evaluateAttributeExpressions();
- final Session newSession;
- if (keyspaceProperty != null) {
- newSession = newCluster.connect(keyspaceProperty.getValue());
- } else {
- newSession = newCluster.connect();
- }
-
newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel));
- Metadata metadata = newCluster.getMetadata();
- log.info("Connected to Cassandra cluster: {}", new
Object[]{metadata.getClusterName()});
-
- cluster = newCluster;
- cassandraSession = newSession;
- }
- }
-
- private List<InetSocketAddress> getContactPoints(String contactPointList) {
-
- if (contactPointList == null) {
- return null;
- }
-
- final List<String> contactPointStringList =
Arrays.asList(contactPointList.split(","));
- List<InetSocketAddress> contactPoints = new ArrayList<>();
-
- for (String contactPointEntry : contactPointStringList) {
- String[] addresses = contactPointEntry.split(":");
- final String hostName = addresses[0].trim();
- final int port = (addresses.length > 1) ?
Integer.parseInt(addresses[1].trim()) : DEFAULT_CASSANDRA_PORT;
-
- contactPoints.add(new InetSocketAddress(hostName, port));
- }
-
- return contactPoints;
- }
-
- private Cluster createCluster(List<InetSocketAddress> contactPoints,
SSLContext sslContext,
- String username, String password) {
- Cluster.Builder builder =
Cluster.builder().addContactPointsWithPorts(contactPoints);
-
- if (sslContext != null) {
- JdkSSLOptions sslOptions = JdkSSLOptions.builder()
- .withSSLContext(sslContext)
- .build();
- builder = builder.withSSL(sslOptions);
- }
-
- if (username != null && password != null) {
- builder = builder.withCredentials(username, password);
- }
-
- return builder.build();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.service;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.JdkSSLOptions;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.Session;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.authentication.exception.ProviderCreationException;
+import org.apache.nifi.cassandra.CassandraSessionProviderService;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"cassandra", "dbcp", "database", "connection", "pooling"})
+@CapabilityDescription("Provides connection session for Cassandra processors
to work with Apache Cassandra.")
+public class CassandraSessionProvider extends AbstractControllerService
implements CassandraSessionProviderService {
+
+ public static final int DEFAULT_CASSANDRA_PORT = 9042;
+
+ // Common descriptors
+ public static final PropertyDescriptor CONTACT_POINTS = new
PropertyDescriptor.Builder()
+ .name("Cassandra Contact Points")
+ .description("Contact points are addresses of Cassandra nodes. The
list of contact points should be "
+ + "comma-separated and in hostname:port format. Example
node1:port,node2:port,...."
+ + " The default client port for Cassandra is 9042, but the
port(s) must be explicitly specified.")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor KEYSPACE = new
PropertyDescriptor.Builder()
+ .name("Keyspace")
+ .description("The Cassandra Keyspace to connect to. If no keyspace
is specified, the query will need to " +
+ "include the keyspace name before any table reference, in
case of 'query' native processors or " +
+ "if the processor supports the 'Table' property, the
keyspace name has to be provided with the " +
+ "table name in the form of <KEYSPACE>.<TABLE>")
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("The SSL Context Service used to provide client
certificate information for TLS/SSL "
+ + "connections.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
+ .name("Client Auth")
+ .description("Client authentication policy when connecting to
secure (TLS/SSL) cluster. "
+ + "Possible values are REQUIRED, WANT, NONE. This property
is only used when an SSL Context "
+ + "has been defined and enabled.")
+ .required(false)
+ .allowableValues(SSLContextService.ClientAuth.values())
+ .defaultValue("REQUIRED")
+ .build();
+
+ public static final PropertyDescriptor USERNAME = new
PropertyDescriptor.Builder()
+ .name("Username")
+ .description("Username to access the Cassandra cluster")
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PASSWORD = new
PropertyDescriptor.Builder()
+ .name("Password")
+ .description("Password to access the Cassandra cluster")
+ .required(false)
+ .sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CONSISTENCY_LEVEL = new
PropertyDescriptor.Builder()
+ .name("Consistency Level")
+ .description("The strategy for how many replicas must respond
before results are returned.")
+ .required(true)
+ .allowableValues(ConsistencyLevel.values())
+ .defaultValue("ONE")
+ .build();
+
+ static final PropertyDescriptor COMPRESSION_TYPE = new
PropertyDescriptor.Builder()
+ .name("Compression Type")
+ .description("Enable compression at transport-level requests and
responses")
+ .required(false)
+ .allowableValues(ProtocolOptions.Compression.values())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("NONE")
+ .build();
+
+ private List<PropertyDescriptor> properties;
+ private Cluster cluster;
+ private Session cassandraSession;
+
+ @Override
+ public void init(final ControllerServiceInitializationContext context) {
+ List<PropertyDescriptor> props = new ArrayList<>();
+
+ props.add(CONTACT_POINTS);
+ props.add(CLIENT_AUTH);
+ props.add(CONSISTENCY_LEVEL);
+ props.add(COMPRESSION_TYPE);
+ props.add(KEYSPACE);
+ props.add(USERNAME);
+ props.add(PASSWORD);
+ props.add(PROP_SSL_CONTEXT_SERVICE);
+
+ properties = props;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ connectToCassandra(context);
+ }
+
+ @OnDisabled
+ public void onDisabled(){
+ if (cassandraSession != null) {
+ cassandraSession.close();
+ }
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+
+ @OnStopped
+ public void onStopped() {
+ if (cassandraSession != null) {
+ cassandraSession.close();
+ }
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+
+ @Override
+ public Cluster getCluster() {
+ if (cluster != null) {
+ return cluster;
+ } else {
+ throw new ProcessException("Unable to get the Cassandra cluster
detail.");
+ }
+ }
+
+ @Override
+ public Session getCassandraSession() {
+ if (cassandraSession != null) {
+ return cassandraSession;
+ } else {
+ throw new ProcessException("Unable to get the Cassandra session.");
+ }
+ }
+
+ private void connectToCassandra(ConfigurationContext context) {
+ if (cluster == null) {
+ ComponentLog log = getLogger();
+ final String contactPointList =
context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
+ final String consistencyLevel =
context.getProperty(CONSISTENCY_LEVEL).getValue();
+ final String compressionType =
context.getProperty(COMPRESSION_TYPE).getValue();
+
+ List<InetSocketAddress> contactPoints =
getContactPoints(contactPointList);
+
+ // Set up the client for secure (SSL/TLS communications) if
configured to do so
+ final SSLContextService sslService =
+
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final String rawClientAuth =
context.getProperty(CLIENT_AUTH).getValue();
+ final SSLContext sslContext;
+
+ if (sslService != null) {
+ final SSLContextService.ClientAuth clientAuth;
+ if (StringUtils.isBlank(rawClientAuth)) {
+ clientAuth = SSLContextService.ClientAuth.REQUIRED;
+ } else {
+ try {
+ clientAuth =
SSLContextService.ClientAuth.valueOf(rawClientAuth);
+ } catch (final IllegalArgumentException iae) {
+ throw new
ProviderCreationException(String.format("Unrecognized client auth '%s'.
Possible values are [%s]",
+ rawClientAuth,
StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+ }
+ }
+ sslContext = sslService.createSSLContext(clientAuth);
+ } else {
+ sslContext = null;
+ }
+
+ final String username, password;
+ PropertyValue usernameProperty =
context.getProperty(USERNAME).evaluateAttributeExpressions();
+ PropertyValue passwordProperty =
context.getProperty(PASSWORD).evaluateAttributeExpressions();
+
+ if (usernameProperty != null && passwordProperty != null) {
+ username = usernameProperty.getValue();
+ password = passwordProperty.getValue();
+ } else {
+ username = null;
+ password = null;
+ }
+
+ // Create the cluster and connect to it
+ Cluster newCluster = createCluster(contactPoints, sslContext,
username, password, compressionType);
+ PropertyValue keyspaceProperty =
context.getProperty(KEYSPACE).evaluateAttributeExpressions();
+ final Session newSession;
+ if (keyspaceProperty != null) {
+ newSession = newCluster.connect(keyspaceProperty.getValue());
+ } else {
+ newSession = newCluster.connect();
+ }
+
newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel));
+ Metadata metadata = newCluster.getMetadata();
+ log.info("Connected to Cassandra cluster: {}", new
Object[]{metadata.getClusterName()});
+
+ cluster = newCluster;
+ cassandraSession = newSession;
+ }
+ }
+
+ private List<InetSocketAddress> getContactPoints(String contactPointList) {
+
+ if (contactPointList == null) {
+ return null;
+ }
+
+ final List<String> contactPointStringList =
Arrays.asList(contactPointList.split(","));
+ List<InetSocketAddress> contactPoints = new ArrayList<>();
+
+ for (String contactPointEntry : contactPointStringList) {
+ String[] addresses = contactPointEntry.split(":");
+ final String hostName = addresses[0].trim();
+ final int port = (addresses.length > 1) ?
Integer.parseInt(addresses[1].trim()) : DEFAULT_CASSANDRA_PORT;
+
+ contactPoints.add(new InetSocketAddress(hostName, port));
+ }
+
+ return contactPoints;
+ }
+
+ private Cluster createCluster(List<InetSocketAddress> contactPoints,
SSLContext sslContext,
+ String username, String password, String
compressionType) {
+ Cluster.Builder builder =
Cluster.builder().addContactPointsWithPorts(contactPoints);
+
+ if (sslContext != null) {
+ JdkSSLOptions sslOptions = JdkSSLOptions.builder()
+ .withSSLContext(sslContext)
+ .build();
+ builder = builder.withSSL(sslOptions);
+ }
+
+ if (username != null && password != null) {
+ builder = builder.withCredentials(username, password);
+ }
+
+ if(ProtocolOptions.Compression.SNAPPY.equals(compressionType)) {
+ builder =
builder.withCompression(ProtocolOptions.Compression.SNAPPY);
+ } else if(ProtocolOptions.Compression.LZ4.equals(compressionType)) {
+ builder = builder.withCompression(ProtocolOptions.Compression.LZ4);
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
index 2b688bc..2f49d02 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
@@ -1,59 +1,59 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.service;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestCassandraSessionProvider {
-
- private static TestRunner runner;
- private static CassandraSessionProvider sessionProvider;
-
- @BeforeClass
- public static void setup() throws InitializationException {
- MockCassandraProcessor mockCassandraProcessor = new
MockCassandraProcessor();
- sessionProvider = new CassandraSessionProvider();
-
- runner = TestRunners.newTestRunner(mockCassandraProcessor);
- runner.addControllerService("cassandra-session-provider",
sessionProvider);
- }
-
- @Test
- public void testGetPropertyDescriptors() {
- List<PropertyDescriptor> properties =
sessionProvider.getPropertyDescriptors();
-
- assertEquals(7, properties.size());
- assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH));
-
assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL));
-
assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));
- assertTrue(properties.contains(CassandraSessionProvider.KEYSPACE));
- assertTrue(properties.contains(CassandraSessionProvider.PASSWORD));
-
assertTrue(properties.contains(CassandraSessionProvider.PROP_SSL_CONTEXT_SERVICE));
- assertTrue(properties.contains(CassandraSessionProvider.USERNAME));
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.service;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestCassandraSessionProvider {
+
+ private static TestRunner runner;
+ private static CassandraSessionProvider sessionProvider;
+
+ @BeforeClass
+ public static void setup() throws InitializationException {
+ MockCassandraProcessor mockCassandraProcessor = new
MockCassandraProcessor();
+ sessionProvider = new CassandraSessionProvider();
+
+ runner = TestRunners.newTestRunner(mockCassandraProcessor);
+ runner.addControllerService("cassandra-session-provider",
sessionProvider);
+ }
+
+ @Test
+ public void testGetPropertyDescriptors() {
+ List<PropertyDescriptor> properties =
sessionProvider.getPropertyDescriptors();
+
+ assertEquals(8, properties.size());
+ assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH));
+
assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL));
+
assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));
+ assertTrue(properties.contains(CassandraSessionProvider.KEYSPACE));
+ assertTrue(properties.contains(CassandraSessionProvider.PASSWORD));
+
assertTrue(properties.contains(CassandraSessionProvider.PROP_SSL_CONTEXT_SERVICE));
+ assertTrue(properties.contains(CassandraSessionProvider.USERNAME));
+ }
+
+}