Repository: camel Updated Branches: refs/heads/master 90121e537 -> aa0bb9277
CAMEL-9050 Camel-cassandraql: Add loadbalancingpolicy as uri option Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aa0bb927 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aa0bb927 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aa0bb927 Branch: refs/heads/master Commit: aa0bb927747d2215273482775f1aa348eb0f2e5b Parents: 90121e5 Author: Andrea Cosentino <anco...@gmail.com> Authored: Wed Aug 5 17:50:06 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Wed Aug 5 17:55:53 2015 +0200 ---------------------------------------------------------------------- .../component/cassandra/CassandraEndpoint.java | 22 +++++++++ .../CassandraLoadBalancingPolicies.java | 48 ++++++++++++++++++++ .../CassandraComponentProducerTest.java | 21 +++++++++ 3 files changed, 91 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/aa0bb927/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java index 0e28d9f..95ab660 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java @@ -21,6 +21,8 @@ import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; +import com.datastax.driver.core.policies.LoadBalancingPolicy; + import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Message; @@ -31,6 +33,7 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.utils.cassandra.CassandraLoadBalancingPolicies; import org.apache.camel.utils.cassandra.CassandraSessionHolder; /** @@ -65,6 +68,8 @@ public class CassandraEndpoint extends DefaultEndpoint { private Session session; @UriParam private ConsistencyLevel consistencyLevel; + @UriParam + private String loadBalancingPolicy; /** * How many rows should be retrieved in message body @@ -140,6 +145,7 @@ public class CassandraEndpoint extends DefaultEndpoint { } protected Cluster.Builder createClusterBuilder() throws Exception { + CassandraLoadBalancingPolicies cassLoadBalancingPolicies = new CassandraLoadBalancingPolicies(); Cluster.Builder clusterBuilder = Cluster.builder(); for (String host : hosts.split(",")) { clusterBuilder = clusterBuilder.addContactPoint(host); @@ -153,6 +159,9 @@ public class CassandraEndpoint extends DefaultEndpoint { if (username != null && !username.isEmpty() && password != null) { clusterBuilder.withCredentials(username, password); } + if (loadBalancingPolicy != null && !loadBalancingPolicy.isEmpty()) { + clusterBuilder.withLoadBalancingPolicy(cassLoadBalancingPolicies.getLoadBalancingPolicy(loadBalancingPolicy)); + } return clusterBuilder; } @@ -337,4 +346,17 @@ public class CassandraEndpoint extends DefaultEndpoint { public void setPrepareStatements(boolean prepareStatements) { this.prepareStatements = prepareStatements; } + + /** + * To use a specific LoadBalancingPolicy + */ + public String getLoadBalancingPolicy() { + return loadBalancingPolicy; + } + + public void setLoadBalancingPolicy(String loadBalancingPolicy) { + this.loadBalancingPolicy = loadBalancingPolicy; + } + + } http://git-wip-us.apache.org/repos/asf/camel/blob/aa0bb927/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java new file mode 100644 index 0000000..30b735c --- /dev/null +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.utils.cassandra; + +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; + +public class CassandraLoadBalancingPolicies { + + public final String roundRobinPolicy = "RoundRobinPolicy"; + public final String tokenAwarePolicy = "TokenAwarePolicy"; + public final String dcAwareRoundRobinPolicy = "DcAwareRoundRobinPolicy"; + + public LoadBalancingPolicy getLoadBalancingPolicy(String policy) { + LoadBalancingPolicy loadBalancingPolicy = new RoundRobinPolicy(); + switch (policy) { + case roundRobinPolicy: + loadBalancingPolicy = new RoundRobinPolicy(); + break; + case tokenAwarePolicy: + loadBalancingPolicy = new TokenAwarePolicy(new RoundRobinPolicy()); + break; + case dcAwareRoundRobinPolicy: + loadBalancingPolicy = new DCAwareRoundRobinPolicy(); + break; + default: + throw new IllegalArgumentException("Cassandra load balancing policy can be " + roundRobinPolicy + " ," + tokenAwarePolicy + + " ," + dcAwareRoundRobinPolicy); + } + return loadBalancingPolicy; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/aa0bb927/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java index f5825ab..71f02e5 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java @@ -58,6 +58,9 @@ public class CassandraComponentProducerTest extends CamelTestSupport { @Produce(uri = "direct:inputNotConsistent") ProducerTemplate notConsistentProducerTemplate; + + @Produce(uri = "direct:loadBalancingPolicy") + ProducerTemplate loadBalancingPolicyTemplate; @BeforeClass public static void setUpClass() throws Exception { @@ -78,6 +81,8 @@ public class CassandraComponentProducerTest extends CamelTestSupport { .to("cql://localhost/camel_ks?cql=" + CQL); from("direct:inputNoParameter") .to("cql://localhost/camel_ks?cql=" + NO_PARAMETER_CQL); + from("direct:loadBalancingPolicy") + .to("cql://localhost/camel_ks?cql=" + NO_PARAMETER_CQL + "&loadBalancingPolicy=RoundRobinPolicy"); from("direct:inputNotConsistent") .to(NOT_CONSISTENT_URI); } @@ -132,6 +137,22 @@ public class CassandraComponentProducerTest extends CamelTestSupport { session.close(); cluster.close(); } + + @Test + public void testLoadBalancing() throws Exception { + Object response = loadBalancingPolicyTemplate.requestBodyAndHeader(new Object[]{"Claus 2", "Ibsen 2", "c_ibsen"}, + CassandraConstants.CQL_QUERY, "update camel_user set first_name=?, last_name=? where login=?"); + + Cluster cluster = CassandraUnitUtils.cassandraCluster(); + Session session = cluster.connect(CassandraUnitUtils.KEYSPACE); + ResultSet resultSet = session.execute("select login, first_name, last_name from camel_user where login = ?", "c_ibsen"); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Claus 2", row.getString("first_name")); + assertEquals("Ibsen 2", row.getString("last_name")); + session.close(); + cluster.close(); + } /** * Test with incoming message containing a header with RegularStatement.