Repository: incubator-apex-malhar Updated Branches: refs/heads/master d9abee962 -> c829adaf1
APEXMALHAR-1948: CassandraStore Should Allow You To Specify Protocol Version. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/84131850 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/84131850 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/84131850 Branch: refs/heads/master Commit: 84131850c4bea86d5d08ddbaa1254027ccba2492 Parents: 5373a3c Author: Priyanka Gugale <[email protected]> Authored: Wed Feb 10 15:14:08 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Fri Mar 18 15:32:20 2016 +0530 ---------------------------------------------------------------------- contrib/pom.xml | 9 +++- .../contrib/cassandra/CassandraStore.java | 49 ++++++++++++++++---- .../cassandra/CassandraOperatorTest.java | 29 +++++++++++- 3 files changed, 74 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/84131850/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 687e9c9..4bbd8f5 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -486,7 +486,14 @@ <dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-core</artifactId> - <version>2.0.2</version> + <version>2.1.8</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>14.0.1</version> + <scope>provided</scope> <optional>true</optional> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/84131850/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java index 49ed20c..5d9178c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java @@ -24,9 +24,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ProtocolVersion; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.DriverException; - import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.db.Connectable; @@ -47,6 +47,7 @@ public class CassandraStore implements Connectable private String node; protected transient Cluster cluster = null; protected transient Session session = null; + private String protocolVersion; @NotNull protected String keyspace; @@ -90,6 +91,20 @@ public class CassandraStore implements Connectable this.password = password; } + public String getProtocolVersion() + { + return protocolVersion; + } + + /** + * Sets the protocolVersion of Cassandra + * @param protocolVersion as V1, V2, V3 etc + */ + public void setProtocolVersion(String protocolVersion) + { + this.protocolVersion = protocolVersion; + } + @NotNull public String getNode() { return node; @@ -115,21 +130,35 @@ public class CassandraStore implements Connectable /** * Creates a cluster object. */ - public void buildCluster(){ - + public void buildCluster() + { try { - - cluster = Cluster.builder() - .addContactPoint(node).withCredentials(userName, password).build(); - } - catch (DriverException ex) { + if (protocolVersion != null && protocolVersion.length() != 0) { + ProtocolVersion version = getCassandraProtocolVersion(); + cluster = Cluster.builder().addContactPoint(node).withCredentials(userName, password).withProtocolVersion(version).build(); + } else { + cluster = Cluster.builder().addContactPoint(node).withCredentials(userName, password).build(); + } + } catch (DriverException ex) { throw new RuntimeException("closing database resource", ex); - } - catch (Throwable t) { + } catch (Throwable t) { DTThrowable.rethrow(t); } } + private ProtocolVersion getCassandraProtocolVersion() + { + switch (protocolVersion.toUpperCase()) { + case "V1": + return ProtocolVersion.V1; + case "V2": + return ProtocolVersion.V2; + case "V3": + return ProtocolVersion.V3; + default: + throw new RuntimeException("Unsupported Cassandra Protocol Version."); + } + } /** * Create connection with database. http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/84131850/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java index 68a1e5c..56d9857 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java @@ -24,19 +24,18 @@ import com.datatorrent.api.Attribute; import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; - import com.datatorrent.lib.helper.TestPortContext; import com.datatorrent.lib.util.FieldInfo; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; import com.google.common.collect.Lists; + import java.util.*; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; - import org.junit.AfterClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,6 +218,32 @@ public class CassandraOperatorTest } @Test + public void testCassandraProtocolVersion() + { + CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore(); + transactionalStore.setNode(NODE); + transactionalStore.setKeyspace(KEYSPACE); + transactionalStore.setProtocolVersion("v2"); + + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); + + TestOutputOperator outputOperator = new TestOutputOperator(); + + outputOperator.setTablename(TABLE_NAME); + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("id", "id", null)); + + outputOperator.setStore(transactionalStore); + outputOperator.setFieldInfos(fieldInfos); + outputOperator.setup(context); + + Configuration config = outputOperator.getStore().getCluster().getConfiguration(); + Assert.assertEquals("Procotol version was not set to V2.", ProtocolVersion.V2, config.getProtocolOptions().getProtocolVersionEnum()); + } + + @Test public void testCassandraOutputOperator() { CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
