http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index b41cc00..0e88929 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -64,7 +64,6 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.thrift.ThriftServer; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; @@ -150,7 +149,6 @@ public class CassandraDaemon static final CassandraDaemon instance = new CassandraDaemon(); - public Server thriftServer; private NativeTransportService nativeTransportService; private JMXConnectorServer jmxServer; @@ -396,12 +394,6 @@ public class CassandraDaemon if (sizeRecorderInterval > 0) ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance, 30, sizeRecorderInterval, TimeUnit.SECONDS); - // Thrift - InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress(); - int rpcPort = DatabaseDescriptor.getRpcPort(); - int listenBacklog = DatabaseDescriptor.getRpcListenBacklog(); - thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog); - // Native transport nativeTransportService = new NativeTransportService(); @@ -492,12 +484,6 @@ public class CassandraDaemon } else logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport()) or nodetool (enablebinary) to start it"); - - String rpcFlag = System.getProperty("cassandra.start_rpc"); - if ((rpcFlag != null && Boolean.parseBoolean(rpcFlag)) || (rpcFlag == null && DatabaseDescriptor.startRpc())) - thriftServer.start(); - else - logger.info("Not starting RPC server as requested. Use JMX (StorageService->startRPCServer()) or nodetool (enablethrift) to start it"); } /** @@ -510,8 +496,6 @@ public class CassandraDaemon // On linux, this doesn't entirely shut down Cassandra, just the RPC server. // jsvc takes care of taking the rest down logger.info("Cassandra shutting down..."); - if (thriftServer != null) - thriftServer.stop(); if (nativeTransportService != null) nativeTransportService.destroy(); StorageService.instance.setRpcReady(false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index 5f01702..52e71ae 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -34,13 +34,13 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.Validation; import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.AuthenticationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.UnauthorizedException; import org.apache.cassandra.schema.SchemaKeyspace; -import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.CassandraVersion; @@ -148,7 +148,7 @@ public class ClientState } /** - * @return a ClientState object for external clients (thrift/native protocol users). + * @return a ClientState object for external clients (native protocol users). */ public static ClientState forExternalCalls(SocketAddress remoteAddress) { @@ -290,7 +290,7 @@ public class ClientState public void hasColumnFamilyAccess(String keyspace, String columnFamily, Permission perm) throws UnauthorizedException, InvalidRequestException { - ThriftValidation.validateColumnFamily(keyspace, columnFamily); + Validation.validateColumnFamily(keyspace, columnFamily); hasAccess(keyspace, perm, DataResource.table(keyspace, columnFamily)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java b/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java index 2515259..9dc92d7 100644 --- a/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java +++ b/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java @@ -20,8 +20,7 @@ package org.apache.cassandra.service; import java.io.IOException; /** - * An embedded, in-memory cassandra storage service that listens - * on the thrift interface as configured in cassandra.yaml + * An embedded, in-memory cassandra storage service. * This kind of service is useful when running unit tests of * services using cassandra for example. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 1d83b70..d5a9f47 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1896,9 +1896,10 @@ public class StorageProxy implements StorageProxyMBean } /** - * Estimate the number of result rows (either cql3 rows or "thrift" rows, as called for by the command) per - * range in the ring based on our local data. This assumes that ranges are uniformly distributed across the cluster - * and that the queried data is also uniformly distributed. + * Estimate the number of result rows per range in the ring based on our local data. + * <p> + * This assumes that ranges are uniformly distributed across the cluster and + * that the queried data is also uniformly distributed. */ private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 8719c6c..15fb984 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -88,9 +88,6 @@ import org.apache.cassandra.service.paxos.CommitVerbHandler; import org.apache.cassandra.service.paxos.PrepareVerbHandler; import org.apache.cassandra.service.paxos.ProposeVerbHandler; import org.apache.cassandra.streaming.*; -import org.apache.cassandra.thrift.EndpointDetails; -import org.apache.cassandra.thrift.TokenRange; -import org.apache.cassandra.thrift.cassandraConstants; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.*; @@ -353,37 +350,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return Gossiper.instance.isEnabled(); } - // should only be called via JMX - public synchronized void startRPCServer() - { - checkServiceAllowedToStart("thrift"); - - if (daemon == null) - { - throw new IllegalStateException("No configured daemon"); - } - daemon.thriftServer.start(); - } - - public void stopRPCServer() - { - if (daemon == null) - { - throw new IllegalStateException("No configured daemon"); - } - if (daemon.thriftServer != null) - daemon.thriftServer.stop(); - } - - public boolean isRPCServerRunning() - { - if ((daemon == null) || (daemon.thriftServer == null)) - { - return false; - } - return daemon.thriftServer.isRunning(); - } - public synchronized void startNativeTransport() { checkServiceAllowedToStart("native transport"); @@ -428,11 +394,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.error("Stopping gossiper"); stopGossiping(); } - if (isRPCServerRunning()) - { - logger.error("Stopping RPC server"); - stopRPCServer(); - } if (isNativeTransportRunning()) { logger.error("Stopping native transport"); @@ -453,7 +414,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void shutdownClientServers() { setRpcReady(false); - stopRPCServer(); stopNativeTransport(); } @@ -616,7 +576,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public synchronized void initServer(int delay) throws ConfigurationException { logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString()); - logger.info("Thrift API version: {}", cassandraConstants.VERSION); logger.info("CQL supported versions: {} (default: {})", StringUtils.join(ClientState.getCQLSupportedVersion(), ", "), ClientState.DEFAULT_CQL_VERSION); logger.info("Native protocol supported versions: {} (default: {})", @@ -1762,32 +1721,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE : getRangeToAddressMap(keyspace); for (Map.Entry<Range<Token>, List<InetAddress>> entry : rangeToAddressMap.entrySet()) - { - Range<Token> range = entry.getKey(); - List<InetAddress> addresses = entry.getValue(); - List<String> endpoints = new ArrayList<>(addresses.size()); - List<String> rpc_endpoints = new ArrayList<>(addresses.size()); - List<EndpointDetails> epDetails = new ArrayList<>(addresses.size()); - - for (InetAddress endpoint : addresses) - { - EndpointDetails details = new EndpointDetails(); - details.host = endpoint.getHostAddress(); - details.datacenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint); - details.rack = DatabaseDescriptor.getEndpointSnitch().getRack(endpoint); - - endpoints.add(details.host); - rpc_endpoints.add(getRpcaddress(endpoint)); - - epDetails.add(details); - } - - TokenRange tr = new TokenRange(tf.toString(range.left.getToken()), tf.toString(range.right.getToken()), endpoints) - .setEndpoint_details(epDetails) - .setRpc_endpoints(rpc_endpoints); - - ranges.add(tr); - } + ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue())); return ranges; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 339b991..b569200 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -510,15 +510,6 @@ public interface StorageServiceMBean extends NotificationEmitter // to determine if initialization has completed public boolean isInitialized(); - // allows a user to disable thrift - public void stopRPCServer(); - - // allows a user to reenable thrift - public void startRPCServer(); - - // to determine if thrift is running - public boolean isRPCServerRunning(); - public void stopNativeTransport(); public void startNativeTransport(); public boolean isNativeTransportRunning(); @@ -624,7 +615,7 @@ public interface StorageServiceMBean extends NotificationEmitter public void resetLocalSchema() throws IOException; /** - * Enables/Disables tracing for the whole system. Only thrift requests can start tracing currently. + * Enables/Disables tracing for the whole system. * * @param probability * ]0,1[ will enable tracing on a partial number of requests with the provided probability. 0 will http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/TokenRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/TokenRange.java b/src/java/org/apache/cassandra/service/TokenRange.java new file mode 100644 index 0000000..0e46910 --- /dev/null +++ b/src/java/org/apache/cassandra/service/TokenRange.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.*; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.IEndpointSnitch; + +/** + * Holds token range informations for the sake of {@link StorageService#describeRing}. + * + * This class mostly exists for the sake of {@link StorageService#describeRing}, + * which used to rely on a thrift class which this is the equivalent of. This is + * the reason this class behave how it does and the reason for the format + * of {@code toString()} in particular (used by + * {@link StorageService#describeRingJMX}). This class probably have no other + * good uses than providing backward compatibility. + */ +public class TokenRange +{ + private final Token.TokenFactory tokenFactory; + + public final Range<Token> range; + public final List<EndpointDetails> endpoints; + + private TokenRange(Token.TokenFactory tokenFactory, Range<Token> range, List<EndpointDetails> endpoints) + { + this.tokenFactory = tokenFactory; + this.range = range; + this.endpoints = endpoints; + } + + private String toStr(Token tk) + { + return tokenFactory.toString(tk); + } + + public static TokenRange create(Token.TokenFactory tokenFactory, Range<Token> range, List<InetAddress> endpoints) + { + List<EndpointDetails> details = new ArrayList<>(endpoints.size()); + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + for (InetAddress ep : endpoints) + details.add(new EndpointDetails(ep, + StorageService.instance.getRpcaddress(ep), + snitch.getDatacenter(ep), + snitch.getRack(ep))); + return new TokenRange(tokenFactory, range, details); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("TokenRange("); + + sb.append("start_token:").append(toStr(range.left)); + sb.append(", end_token:").append(toStr(range.right)); + + List<String> hosts = new ArrayList<>(endpoints.size()); + List<String> rpcs = new ArrayList<>(endpoints.size()); + for (EndpointDetails ep : endpoints) + { + hosts.add(ep.host.getHostAddress()); + rpcs.add(ep.rpcAddress); + } + + sb.append("endpoints:").append(hosts); + sb.append("rpc_endpoints:").append(rpcs); + sb.append("endpoint_details:").append(endpoints); + + sb.append(")"); + return sb.toString(); + } + + public static class EndpointDetails + { + public final InetAddress host; + public final String rpcAddress; + public final String datacenter; + public final String rack; + + private EndpointDetails(InetAddress host, String rpcAddress, String datacenter, String rack) + { + // dc and rack can be null, but host shouldn't + assert host != null; + this.host = host; + this.rpcAddress = rpcAddress; + this.datacenter = datacenter; + this.rack = rack; + } + + @Override + public String toString() + { + // Format matters for backward compatibility with describeRing() + String dcStr = datacenter == null ? "" : String.format(", datacenter:%s", datacenter); + String rackStr = rack == null ? "" : String.format(", rack:%s", rack); + return String.format("EndpointDetails(host:%s%s%s)", host.getHostAddress(), dcStr, rackStr); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/pager/PagingState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java index 4a9ac39..bcf3979 100644 --- a/src/java/org/apache/cassandra/service/pager/PagingState.java +++ b/src/java/org/apache/cassandra/service/pager/PagingState.java @@ -23,10 +23,11 @@ import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.LegacyLayout; +import org.apache.cassandra.db.CompactTables; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.io.util.DataInputBuffer; @@ -216,12 +217,12 @@ public class PagingState // If the last returned row has no cell, this means in 2.1/2.2 terms that we stopped on the row // marker. Note that this shouldn't happen if the table is COMPACT. assert !metadata.isCompactTable(); - mark = LegacyLayout.encodeCellName(metadata, row.clustering(), ByteBufferUtil.EMPTY_BYTE_BUFFER, null); + mark = encodeCellName(metadata, row.clustering(), ByteBufferUtil.EMPTY_BYTE_BUFFER, null); } else { Cell cell = cells.next(); - mark = LegacyLayout.encodeCellName(metadata, row.clustering(), cell.column().name.bytes, cell.column().isComplex() ? cell.path().get(0) : null); + mark = encodeCellName(metadata, row.clustering(), cell.column().name.bytes, cell.column().isComplex() ? cell.path().get(0) : null); } } else @@ -239,10 +240,84 @@ public class PagingState return null; return protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3) - ? LegacyLayout.decodeClustering(metadata, mark) + ? decodeClustering(metadata, mark) : Clustering.serializer.deserialize(mark, MessagingService.VERSION_30, makeClusteringTypes(metadata)); } + // Old (pre-3.0) encoding of cells. We need that for the protocol v3 as that is how things where encoded + private static ByteBuffer encodeCellName(CFMetaData metadata, Clustering clustering, ByteBuffer columnName, ByteBuffer collectionElement) + { + boolean isStatic = clustering == Clustering.STATIC_CLUSTERING; + + if (!metadata.isCompound()) + { + if (isStatic) + return columnName; + + assert clustering.size() == 1 : "Expected clustering size to be 1, but was " + clustering.size(); + return clustering.get(0); + } + + // We use comparator.size() rather than clustering.size() because of static clusterings + int clusteringSize = metadata.comparator.size(); + int size = clusteringSize + (metadata.isDense() ? 0 : 1) + (collectionElement == null ? 0 : 1); + if (metadata.isSuper()) + size = clusteringSize + 1; + ByteBuffer[] values = new ByteBuffer[size]; + for (int i = 0; i < clusteringSize; i++) + { + if (isStatic) + { + values[i] = ByteBufferUtil.EMPTY_BYTE_BUFFER; + continue; + } + + ByteBuffer v = clustering.get(i); + // we can have null (only for dense compound tables for backward compatibility reasons) but that + // means we're done and should stop there as far as building the composite is concerned. + if (v == null) + return CompositeType.build(Arrays.copyOfRange(values, 0, i)); + + values[i] = v; + } + + if (metadata.isSuper()) + { + // We need to set the "column" (in thrift terms) name, i.e. the value corresponding to the subcomparator. + // What it is depends if this a cell for a declared "static" column or a "dynamic" column part of the + // super-column internal map. + assert columnName != null; // This should never be null for supercolumns, see decodeForSuperColumn() above + values[clusteringSize] = columnName.equals(CompactTables.SUPER_COLUMN_MAP_COLUMN) + ? collectionElement + : columnName; + } + else + { + if (!metadata.isDense()) + values[clusteringSize] = columnName; + if (collectionElement != null) + values[clusteringSize + 1] = collectionElement; + } + + return CompositeType.build(isStatic, values); + } + + private static Clustering decodeClustering(CFMetaData metadata, ByteBuffer value) + { + int csize = metadata.comparator.size(); + if (csize == 0) + return Clustering.EMPTY; + + if (metadata.isCompound() && CompositeType.isStaticName(value)) + return Clustering.STATIC_CLUSTERING; + + List<ByteBuffer> components = metadata.isCompound() + ? CompositeType.splitName(value) + : Collections.singletonList(value); + + return Clustering.make(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize])); + } + @Override public final int hashCode() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java index 5ba13a4..68547be 100644 --- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java @@ -30,9 +30,6 @@ import org.apache.cassandra.transport.ProtocolVersion; /** * Pages a PartitionRangeReadCommand. - * - * Note: this only work for CQL3 queries for now (because thrift queries expect - * a different limit on the rows than on the columns, which complicates it). */ public class PartitionRangeQueryPager extends AbstractQueryPager { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java deleted file mode 100644 index 15311ab..0000000 --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service.pager; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.exceptions.RequestValidationException; -import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.transport.ProtocolVersion; - -/** - * Static utility methods for paging. - */ -public class QueryPagers -{ - private QueryPagers() {}; - - /** - * Convenience method that count (live) cells/rows for a given slice of a row, but page underneath. - */ - public static int countPaged(CFMetaData metadata, - DecoratedKey key, - ColumnFilter columnFilter, - ClusteringIndexFilter filter, - DataLimits limits, - ConsistencyLevel consistencyLevel, - ClientState state, - final int pageSize, - int nowInSec, - boolean isForThrift, - long queryStartNanoTime) throws RequestValidationException, RequestExecutionException - { - SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter); - final SinglePartitionPager pager = new SinglePartitionPager(command, null, ProtocolVersion.CURRENT); - - int count = 0; - while (!pager.isExhausted()) - { - try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state, queryStartNanoTime)) - { - DataLimits.Counter counter = limits.newCounter(nowInSec, true); - PartitionIterators.consume(counter.applyTo(iter)); - count += counter.counted(); - } - } - return count; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java index e400fb6..e95c358 100644 --- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java @@ -89,8 +89,9 @@ public class SinglePartitionPager extends AbstractQueryPager protected ReadCommand nextPageReadCommand(int pageSize) { Clustering clustering = lastReturned == null ? null : lastReturned.clustering(command.metadata()); - DataLimits limits = (lastReturned == null || command.isForThrift()) ? limits().forPaging(pageSize) - : limits().forPaging(pageSize, key(), remainingInPartition()); + DataLimits limits = lastReturned == null + ? limits().forPaging(pageSize) + : limits().forPaging(pageSize, key(), remainingInPartition()); return command.forPaging(clustering, limits); }