give it a try see how it behaves On Mar 15, 2017 10:09 AM, "Frank Hughes" <frankhughes...@gmail.com> wrote:
> Thanks Ryan, appreciated again. getPolicy just had this: > > Policy policy = new TokenAwarePolicy(DCAwareRoundRobinPolicy. > builder().build()); > > so i guess i need > > Policy policy = new > TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build(), > false); > > Frank > > On 2017-03-15 13:45 (-0000), Ryan Svihla <r...@foundev.pro> wrote: > > I don't see what getPolicy is retrieving but you want to use TokenAware > > with the shuffle false option in the ctor, it defaults to shuffle true so > > that load is spread when people have horribly fat partitions. > > > > On Wed, Mar 15, 2017 at 9:41 AM, Frank Hughes <frankhughes...@gmail.com> > > wrote: > > > > > Thanks for reply. Much appreciated. > > > > > > I should have included more detail. So I am using replication factor 2, > > > and the code is using a token aware method of distributing the work so > that > > > only data that is primarily owned by the node is read on that local > > > machine. So i guess this points to the logic im using to determine > what is > > > primarily owned by a node. I guess this is verging into something that > > > should be posted to the java driver list, but i'll post here in case > its > > > useful or theres an obvious problem: > > > > > > PoolingOptions poolingOpts = new PoolingOptions(); > > > poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE, > this.coreConn); > > > poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE, > this.maxConn); > > > poolingOpts.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768); > > > poolingOpts.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000); > > > > > > SocketOptions socketOptions = new SocketOptions(); > > > socketOptions.setReadTimeoutMillis(15000); > > > > > > Cluster.Builder builder = Cluster.builder(); > > > for(String contactPoint: contactPoints){ > > > builder.addContactPoint(contactPoint.trim()); > > > builder.withPoolingOptions(poolingOpts); > > > builder.withSocketOptions(socketOptions); > > > } > > > > > > builder.withLoadBalancingPolicy(getPolicy()) > > > .withQueryOptions(new QueryOptions() > > > .setPrepareOnAllHosts(true) > > > .setMetadataEnabled(true) > > > ); > > > > > > Cluster cluster = builder.build(); > > > Metadata metadata = cluster.getMetadata(); > > > Session session = cluster.connect(keyspaceName); > > > Set<Host> allHosts = metadata.getAllHosts(); > > > int numberOfHost = 4; > > > > > > Host localHost = null; > > > for (Host host : allHosts) { > > > if(host.getAddress().getHostAddress().equalsIgnoreCase(local)) > > > localHost = host; > > > } > > > > > > Map<Host, List<TokenRange>> replicaCount = new HashMap<Host, > > > List<TokenRange>>(); > > > TokenRange[] tokenRanges = unwrapTokenRanges(metadata. > getTokenRanges()).toArray(new > > > TokenRange[0]); > > > > > > List<TokenRange> tokenRangeList = Arrays.asList(tokenRanges); > > > tokenRangeList.sort(new Comparator<TokenRange>() { > > > @Override > > > public int compare(TokenRange o1, TokenRange o2) { > > > return o1.getStart().compareTo(o2.getStart()); > > > } > > > }); > > > > > > int numberOfHost = metadata.getAllHosts().size(); > > > int rangesPerHost = tokenRanges.length / numberOfHost; > > > > > > for(TokenRange tokenRange : tokenRangeList){ > > > > > > Set<Host> hosts = metadata.getReplicas(keyspaceName, tokenRange); > > > > > > String rangeHosts = ""; > > > Iterator<Host> iter = hosts.iterator(); > > > while(iter.hasNext()){ > > > Host host = iter.next(); > > > > > > List<TokenRange> tokenRangesForHost = replicaCount.get(host); > > > if(tokenRangesForHost == null){ > > > tokenRangesForHost = new ArrayList<TokenRange>(); > > > } > > > > > > if(tokenRangesForHost.size() < rangesPerHost || > !iter.hasNext()){ > > > tokenRangesForHost.add(tokenRange); > > > replicaCount.put(host, tokenRangesForHost); > > > break; > > > } > > > > > > rangeHosts += host.getAddress().toString(); > > > } > > > } > > > > > > for(Host replica : replicaCount.keySet()){ > > > List<TokenRange> allocatedRanges = replicaCount.get(replica); > > > for(TokenRange tr : replicaCount.get(replica)){ > > > System.out.println(tr.getStart() + " to " + tr.getEnd()); > > > } > > > } > > > > > > //get a list of token ranges for this host > > > List<TokenRange> tokenRangesForHost = replicaCount.get(localHost); > > > > > > Again, any thoughts are much appreciated. > > > > > > Thanks > > > > > > Frank > > > > > > > > > On 2017-03-15 12:38 (-0000), Ryan Svihla <r...@foundev.pro> wrote: > > > > LOCAL_ONE just means local to the datacenter by default the > tokenaware > > > > policy will go to a replica that owns that data (primary or any > replica > > > > depends on the driver) and that may or may not be the node the driver > > > > process is running on. > > > > > > > > So to put this more concretely if you have RF 2 with that 4 node > cluster > > > so > > > > 2 nodes will be responsible for that data and if your local process > is > > > not > > > > running on one of those 2 nodes it will definitely HAVE to go to > another > > > > node. > > > > > > > > Therefore, if you wanted to pin behavior to a local replica you'd > have to > > > > send your work out in a token aware fashion where said work only > goes to > > > > the primary token owner of that data, and remove any shuffling of > > > replicas > > > > in the process (is only on by default in the java driver to my > > > knowledge). > > > > > > > > On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes < > frankhughes...@gmail.com> > > > > wrote: > > > > > > > > > Hi there, > > > > > > > > > > Im running a java process on a 4 node cassandra 3.9 cluster on EC2 > > > > > (instance type t2.2xlarge), the process running separately on each > of > > > the > > > > > nodes (i.e. 4 running JVMs). > > > > > The process is just doing reads from Cassandra and building a SOLR > > > index > > > > > and using the java driver with consistency level LOCAL_ONE. > > > > > However, the following exception is through: > > > > > > > > > > com.datastax.driver.core.exceptions.TransportException: [/ > > > 10.0.0.2:9042] > > > > > Connection has been closed > > > > > at com.datastax.driver.core.exceptions.TransportException. > > > > > copy(TransportException.java:38) > > > > > at com.datastax.driver.core.exceptions.TransportException. > > > > > copy(TransportException.java:24) > > > > > at com.datastax.driver.core.DriverThrowables. > propagateCause( > > > > > DriverThrowables.java:37) > > > > > at com.datastax.driver.core.ArrayBackedResultSet$ > > > > > MultiPage.prepareNextRow(ArrayBackedResultSet.java:313) > > > > > at com.datastax.driver.core.ArrayBackedResultSet$ > > > > > MultiPage.isExhausted(ArrayBackedResultSet.java:269) > > > > > at com.datastax.driver.core.ArrayBackedResultSet$1. > > > > > hasNext(ArrayBackedResultSet.java:143) > > > > > > > > > > where 10.0.0.2 is not the local machine. So my questions: > > > > > > > > > > - Should this happen when Im using consistency level LOCAL_ONE and > just > > > > > doing reads ? > > > > > - Does this suggest non-local reads are happening ? > > > > > > > > > > Many thanks for any help/ideas. > > > > > > > > > > Frank > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Thanks, > > > > Ryan Svihla > > > > > > > > > > > > > > > -- > > > > Thanks, > > Ryan Svihla > > >