Repository: nifi Updated Branches: refs/heads/master 75bb4bfaa -> 6cbc58543
NIFI-1998: Upgraded Cassandra driver to 3.0.2 This closes #521. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6cbc5854 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6cbc5854 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6cbc5854 Branch: refs/heads/master Commit: 6cbc585438572e440c07722dc8c13d6be63e9966 Parents: 75bb4bf Author: Matt Burgess <[email protected]> Authored: Fri Jun 10 15:25:09 2016 -0400 Committer: Pierre Villard <[email protected]> Committed: Mon Jun 13 23:28:14 2016 +0200 ---------------------------------------------------------------------- .../nifi-cassandra-processors/pom.xml | 2 +- .../cassandra/AbstractCassandraProcessor.java | 21 +++++++++---- .../processors/cassandra/PutCassandraQL.java | 32 ++++++++++++-------- .../AbstractCassandraProcessorTest.java | 2 +- .../cassandra/CassandraQueryTestUtil.java | 9 ++++-- .../cassandra/PutCassandraQLTest.java | 2 +- .../cassandra/QueryCassandraTest.java | 2 +- 7 files changed, 44 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml index 68e1eeb..a341fa0 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml @@ -41,7 +41,7 @@ <dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-core</artifactId> - <version>2.1.9</version> + <version>3.0.2</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java index 3f93b81..d703ee8 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java @@ -17,12 +17,14 @@ package org.apache.nifi.processors.cassandra; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.CodecRegistry; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.DataType; +import com.datastax.driver.core.JdkSSLOptions; import com.datastax.driver.core.Metadata; import com.datastax.driver.core.Row; -import com.datastax.driver.core.SSLOptions; import com.datastax.driver.core.Session; +import com.datastax.driver.core.TypeCodec; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.commons.lang3.StringUtils; @@ -167,6 +169,8 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { protected final AtomicReference<Cluster> cluster = new AtomicReference<>(null); protected final AtomicReference<Session> cassandraSession = new AtomicReference<>(null); + protected static final CodecRegistry codecRegistry = new CodecRegistry(); + @Override protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { Set<ValidationResult> results = new HashSet<>(); @@ -253,7 +257,10 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { String username, String password) { Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints); if (sslContext != null) { - builder = builder.withSSL(new SSLOptions(sslContext, SSLOptions.DEFAULT_SSL_CIPHER_SUITES)); + JdkSSLOptions sslOptions = JdkSSLOptions.builder() + .withSSLContext(sslContext) + .build(); + builder = builder.withSSL(sslOptions); } if (username != null && password != null) { builder = builder.withCredentials(username, password); @@ -315,15 +322,17 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { } // Get the first type argument, to be used for lists and sets (and the first in a map) DataType firstArg = typeArguments.get(0); + TypeCodec firstCodec = codecRegistry.codecFor(firstArg); if (dataType.equals(DataType.set(firstArg))) { - return row.getSet(i, firstArg.asJavaClass()); + return row.getSet(i, firstCodec.getJavaType()); } else if (dataType.equals(DataType.list(firstArg))) { - return row.getList(i, firstArg.asJavaClass()); + return row.getList(i, firstCodec.getJavaType()); } else { // Must be an n-arg collection like map DataType secondArg = typeArguments.get(1); + TypeCodec secondCodec = codecRegistry.codecFor(secondArg); if (dataType.equals(DataType.map(firstArg, secondArg))) { - return row.getMap(i, firstArg.asJavaClass(), secondArg.asJavaClass()); + return row.getMap(i, firstCodec.getJavaType(), secondCodec.getJavaType()); } } @@ -427,7 +436,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { return primitiveType; } } - throw new IllegalArgumentException("Not a primitive Cassandra type: " + dataTypeName); + return null; } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java index 8c0908a..4af3aa7 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.DataType; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Session; +import com.datastax.driver.core.TypeCodec; import com.datastax.driver.core.exceptions.AuthenticationException; import com.datastax.driver.core.exceptions.InvalidTypeException; import com.datastax.driver.core.exceptions.NoHostAvailableException; @@ -119,7 +120,6 @@ public class PutCassandraQL extends AbstractCassandraProcessor { // Matches on top-level type (primitive types like text,int) and also for collections (like list<boolean> and map<float,double>) private static final Pattern CQL_TYPE_PATTERN = Pattern.compile("([^<]+)(<([^,>]+)(,([^,>]+))*>)?"); - /* * Will ensure that the list of property descriptors is build only once. * Will also create a Set of relationships @@ -310,9 +310,9 @@ public class PutCassandraQL extends AbstractCassandraProcessor { // If the matcher doesn't match, this should fall through to the exception at the bottom if (matcher.find() && matcher.groupCount() > 1) { String mainTypeString = matcher.group(1).toLowerCase(); - DataType.Name mainTypeName = DataType.Name.valueOf(mainTypeString.toUpperCase()); - if (!mainTypeName.isCollection()) { - DataType mainType = getPrimitiveDataTypeFromString(mainTypeString); + DataType mainType = getPrimitiveDataTypeFromString(mainTypeString); + if (mainType != null) { + TypeCodec typeCodec = codecRegistry.codecFor(mainType); // Need the right statement.setXYZ() method if (mainType.equals(DataType.ascii()) @@ -327,23 +327,23 @@ public class PutCassandraQL extends AbstractCassandraProcessor { statement.setString(paramIndex, paramValue); } else if (mainType.equals(DataType.cboolean())) { - statement.setBool(paramIndex, (boolean) mainType.parse(paramValue)); + statement.setBool(paramIndex, (boolean) typeCodec.parse(paramValue)); } else if (mainType.equals(DataType.cint())) { - statement.setInt(paramIndex, (int) mainType.parse(paramValue)); + statement.setInt(paramIndex, (int) typeCodec.parse(paramValue)); } else if (mainType.equals(DataType.bigint()) || mainType.equals(DataType.counter())) { - statement.setLong(paramIndex, (long) mainType.parse(paramValue)); + statement.setLong(paramIndex, (long) typeCodec.parse(paramValue)); } else if (mainType.equals(DataType.cfloat())) { - statement.setFloat(paramIndex, (float) mainType.parse(paramValue)); + statement.setFloat(paramIndex, (float) typeCodec.parse(paramValue)); } else if (mainType.equals(DataType.cdouble())) { - statement.setDouble(paramIndex, (double) mainType.parse(paramValue)); + statement.setDouble(paramIndex, (double) typeCodec.parse(paramValue)); } else if (mainType.equals(DataType.blob())) { - statement.setBytes(paramIndex, (ByteBuffer) mainType.parse(paramValue)); + statement.setBytes(paramIndex, (ByteBuffer) typeCodec.parse(paramValue)); } return; @@ -352,22 +352,28 @@ public class PutCassandraQL extends AbstractCassandraProcessor { if (matcher.groupCount() > 2) { String firstParamTypeName = matcher.group(3); DataType firstParamType = getPrimitiveDataTypeFromString(firstParamTypeName); + if (firstParamType == null) { + throw new IllegalArgumentException("Nested collections are not supported"); + } // Check for map type if (DataType.Name.MAP.toString().equalsIgnoreCase(mainTypeString)) { if (matcher.groupCount() > 4) { String secondParamTypeName = matcher.group(5); DataType secondParamType = getPrimitiveDataTypeFromString(secondParamTypeName); - statement.setMap(paramIndex, (Map) DataType.map(firstParamType, secondParamType).parse(paramValue)); + DataType mapType = DataType.map(firstParamType, secondParamType); + statement.setMap(paramIndex, (Map) codecRegistry.codecFor(mapType).parse(paramValue)); return; } } else { // Must be set or list if (DataType.Name.SET.toString().equalsIgnoreCase(mainTypeString)) { - statement.setSet(paramIndex, (Set) DataType.set(firstParamType).parse(paramValue)); + DataType setType = DataType.set(firstParamType); + statement.setSet(paramIndex, (Set) codecRegistry.codecFor(setType).parse(paramValue)); return; } else if (DataType.Name.LIST.toString().equalsIgnoreCase(mainTypeString)) { - statement.setList(paramIndex, (List) DataType.list(firstParamType).parse(paramValue)); + DataType listType = DataType.list(firstParamType); + statement.setList(paramIndex, (List) codecRegistry.codecFor(listType).parse(paramValue)); return; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java index 19e2320..3b0e273 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java @@ -270,7 +270,7 @@ public class AbstractCassandraProcessorTest { Metadata mockMetadata = mock(Metadata.class); when(mockMetadata.getClusterName()).thenReturn("cluster1"); when(mockCluster.getMetadata()).thenReturn(mockMetadata); - Configuration config = new Configuration(); + Configuration config = Configuration.builder().build(); when(mockCluster.getConfiguration()).thenReturn(config); return mockCluster; } http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java index 17f2c27..0d5571e 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.DataType; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.google.common.collect.Sets; +import com.google.common.reflect.TypeToken; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -34,7 +35,9 @@ import java.util.Map; import java.util.Set; import java.util.TimeZone; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -107,9 +110,9 @@ public class CassandraQueryTestUtil { when(row.getString(0)).thenReturn(user_id); when(row.getString(1)).thenReturn(first_name); when(row.getString(2)).thenReturn(last_name); - when(row.getSet(3, String.class)).thenReturn(emails); - when(row.getList(4, String.class)).thenReturn(top_places); - when(row.getMap(5, Date.class, String.class)).thenReturn(todo); + when(row.getSet(eq(3), any(TypeToken.class))).thenReturn(emails); + when(row.getList(eq(4), any(TypeToken.class))).thenReturn(top_places); + when(row.getMap(eq(5), any(TypeToken.class), any(TypeToken.class))).thenReturn(todo); when(row.getBool(6)).thenReturn(registered); when(row.getFloat(7)).thenReturn(scale); when(row.getDouble(8)).thenReturn(metric); http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java index 17e52dd..1a40556 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java @@ -180,7 +180,7 @@ public class PutCassandraQLTest { when(mockCluster.getMetadata()).thenReturn(mockMetadata); when(mockCluster.connect()).thenReturn(mockSession); when(mockCluster.connect(anyString())).thenReturn(mockSession); - Configuration config = new Configuration(); + Configuration config = Configuration.builder().build(); when(mockCluster.getConfiguration()).thenReturn(config); ResultSetFuture future = mock(ResultSetFuture.class); ResultSet rs = CassandraQueryTestUtil.createMockResultSet(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java index 444da4d..023f239 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java @@ -340,7 +340,7 @@ public class QueryCassandraTest { Session mockSession = mock(Session.class); when(mockCluster.connect()).thenReturn(mockSession); when(mockCluster.connect(anyString())).thenReturn(mockSession); - Configuration config = new Configuration(); + Configuration config = Configuration.builder().build(); when(mockCluster.getConfiguration()).thenReturn(config); ResultSetFuture future = mock(ResultSetFuture.class); ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
