give it a try see how it behaves

On Mar 15, 2017 10:09 AM, "Frank Hughes" <frankhughes...@gmail.com> wrote:

> Thanks Ryan, appreciated again. getPolicy just had this:
>
> Policy policy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.
> builder().build());
>
> so i guess i need
>
> Policy policy = new 
> TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build(),
> false);
>
> Frank
>
> On 2017-03-15 13:45 (-0000), Ryan Svihla <r...@foundev.pro> wrote:
> > I don't see what getPolicy is retrieving but you want to use TokenAware
> > with the shuffle false option in the ctor, it defaults to shuffle true so
> > that load is spread when people have horribly fat partitions.
> >
> > On Wed, Mar 15, 2017 at 9:41 AM, Frank Hughes <frankhughes...@gmail.com>
> > wrote:
> >
> > > Thanks for reply. Much appreciated.
> > >
> > > I should have included more detail. So I am using replication factor 2,
> > > and the code is using a token aware method of distributing the work so
> that
> > > only data that is primarily owned by the node is read on that local
> > > machine. So i guess this points to the logic im using to determine
> what is
> > > primarily owned by a node. I guess this is verging into something that
> > > should be posted to the java driver list, but i'll post here in case
> its
> > > useful or theres an obvious problem:
> > >
> > > PoolingOptions poolingOpts = new PoolingOptions();
> > > poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE,
> this.coreConn);
> > > poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE,
> this.maxConn);
> > > poolingOpts.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768);
> > > poolingOpts.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
> > >
> > > SocketOptions socketOptions = new SocketOptions();
> > > socketOptions.setReadTimeoutMillis(15000);
> > >
> > > Cluster.Builder builder = Cluster.builder();
> > > for(String contactPoint: contactPoints){
> > >     builder.addContactPoint(contactPoint.trim());
> > >     builder.withPoolingOptions(poolingOpts);
> > >     builder.withSocketOptions(socketOptions);
> > > }
> > >
> > > builder.withLoadBalancingPolicy(getPolicy())
> > >         .withQueryOptions(new QueryOptions()
> > >                 .setPrepareOnAllHosts(true)
> > >                 .setMetadataEnabled(true)
> > >         );
> > >
> > > Cluster cluster = builder.build();
> > > Metadata metadata = cluster.getMetadata();
> > > Session session = cluster.connect(keyspaceName);
> > > Set<Host> allHosts = metadata.getAllHosts();
> > > int numberOfHost = 4;
> > >
> > > Host localHost = null;
> > > for (Host host : allHosts) {
> > >     if(host.getAddress().getHostAddress().equalsIgnoreCase(local))
> > >         localHost = host;
> > > }
> > >
> > > Map<Host, List<TokenRange>> replicaCount = new HashMap<Host,
> > > List<TokenRange>>();
> > > TokenRange[] tokenRanges = unwrapTokenRanges(metadata.
> getTokenRanges()).toArray(new
> > > TokenRange[0]);
> > >
> > > List<TokenRange> tokenRangeList = Arrays.asList(tokenRanges);
> > > tokenRangeList.sort(new Comparator<TokenRange>() {
> > >     @Override
> > >     public int compare(TokenRange o1, TokenRange o2) {
> > >         return o1.getStart().compareTo(o2.getStart());
> > >     }
> > > });
> > >
> > > int numberOfHost = metadata.getAllHosts().size();
> > > int rangesPerHost = tokenRanges.length / numberOfHost;
> > >
> > > for(TokenRange tokenRange : tokenRangeList){
> > >
> > >     Set<Host> hosts = metadata.getReplicas(keyspaceName, tokenRange);
> > >
> > >     String rangeHosts = "";
> > >     Iterator<Host> iter = hosts.iterator();
> > >     while(iter.hasNext()){
> > >         Host host = iter.next();
> > >
> > >         List<TokenRange> tokenRangesForHost = replicaCount.get(host);
> > >         if(tokenRangesForHost == null){
> > >             tokenRangesForHost = new ArrayList<TokenRange>();
> > >         }
> > >
> > >         if(tokenRangesForHost.size() < rangesPerHost ||
> !iter.hasNext()){
> > >             tokenRangesForHost.add(tokenRange);
> > >             replicaCount.put(host, tokenRangesForHost);
> > >             break;
> > >         }
> > >
> > >         rangeHosts += host.getAddress().toString();
> > >     }
> > > }
> > >
> > > for(Host replica : replicaCount.keySet()){
> > >     List<TokenRange> allocatedRanges = replicaCount.get(replica);
> > >     for(TokenRange tr : replicaCount.get(replica)){
> > >         System.out.println(tr.getStart() + " to " + tr.getEnd());
> > >     }
> > > }
> > >
> > > //get a list of token ranges for this host
> > > List<TokenRange> tokenRangesForHost = replicaCount.get(localHost);
> > >
> > > Again, any thoughts are much appreciated.
> > >
> > > Thanks
> > >
> > > Frank
> > >
> > >
> > > On 2017-03-15 12:38 (-0000), Ryan Svihla <r...@foundev.pro> wrote:
> > > > LOCAL_ONE just means local to the datacenter by default the
> tokenaware
> > > > policy will go to a replica that owns that data (primary or any
> replica
> > > > depends on the driver) and that may or may not be the node the driver
> > > > process is running on.
> > > >
> > > > So to put this more concretely if you have RF 2 with that 4 node
> cluster
> > > so
> > > > 2 nodes will be responsible for that data and if your local process
> is
> > > not
> > > > running on one of those 2 nodes it will definitely HAVE to go to
> another
> > > > node.
> > > >
> > > > Therefore, if you wanted to pin behavior to a local replica you'd
> have to
> > > > send your work out in a token aware fashion where said work only
> goes to
> > > > the primary token owner of that data, and remove any shuffling of
> > > replicas
> > > > in the process (is only on by default in the java driver to my
> > > knowledge).
> > > >
> > > > On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes <
> frankhughes...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi there,
> > > > >
> > > > > Im running a java process on a 4 node cassandra 3.9 cluster on EC2
> > > > > (instance type t2.2xlarge), the process running separately on each
> of
> > > the
> > > > > nodes (i.e. 4 running JVMs).
> > > > > The process is just doing reads from Cassandra and building a SOLR
> > > index
> > > > > and using the java driver with consistency level LOCAL_ONE.
> > > > > However, the following exception is through:
> > > > >
> > > > > com.datastax.driver.core.exceptions.TransportException: [/
> > > 10.0.0.2:9042]
> > > > > Connection has been closed
> > > > >         at com.datastax.driver.core.exceptions.TransportException.
> > > > > copy(TransportException.java:38)
> > > > >         at com.datastax.driver.core.exceptions.TransportException.
> > > > > copy(TransportException.java:24)
> > > > >         at com.datastax.driver.core.DriverThrowables.
> propagateCause(
> > > > > DriverThrowables.java:37)
> > > > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > > > MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
> > > > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > > > MultiPage.isExhausted(ArrayBackedResultSet.java:269)
> > > > >         at com.datastax.driver.core.ArrayBackedResultSet$1.
> > > > > hasNext(ArrayBackedResultSet.java:143)
> > > > >
> > > > > where 10.0.0.2 is not the local machine. So my questions:
> > > > >
> > > > > - Should this happen when Im using consistency level LOCAL_ONE and
> just
> > > > > doing reads ?
> > > > > - Does this suggest non-local reads are happening ?
> > > > >
> > > > > Many thanks for any help/ideas.
> > > > >
> > > > > Frank
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Thanks,
> > > > Ryan Svihla
> > > >
> > >
> >
> >
> >
> > --
> >
> > Thanks,
> > Ryan Svihla
> >
>

Reply via email to