http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 3aab12f..6696e10 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -29,14 +29,13 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.config.ReadRepairDecision; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.ReadCommand; -import org.apache.cassandra.db.ReadResponse; -import org.apache.cassandra.db.Row; +import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.UnavailableException; @@ -62,22 +61,15 @@ public abstract class AbstractReadExecutor protected final ReadCommand command; protected final List<InetAddress> targetReplicas; - protected final RowDigestResolver resolver; - protected final ReadCallback<ReadResponse, Row> handler; + protected final ReadCallback handler; protected final TraceState traceState; - AbstractReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) + AbstractReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) { this.command = command; this.targetReplicas = targetReplicas; - resolver = new RowDigestResolver(command.ksName, command.key, targetReplicas.size()); - traceState = Tracing.instance.get(); - handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas); - } - - private static boolean isLocalRequest(InetAddress replica) - { - return replica.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS; + this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas); + this.traceState = Tracing.instance.get(); } protected void makeDataRequests(Iterable<InetAddress> endpoints) @@ -98,7 +90,7 @@ public abstract class AbstractReadExecutor for (InetAddress endpoint : endpoints) { - if (isLocalRequest(endpoint)) + if (StorageProxy.canDoLocalRequest(endpoint)) { hasLocalEndpoint = true; continue; @@ -142,7 +134,7 @@ public abstract class AbstractReadExecutor * wait for an answer. Blocks until success or timeout, so it is caller's * responsibility to call maybeTryAdditionalReplicas first. */ - public Row get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException + public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException { return handler.get(); } @@ -150,11 +142,11 @@ public abstract class AbstractReadExecutor /** * @return an executor appropriate for the configured speculative read policy */ - public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException + public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException { - Keyspace keyspace = Keyspace.open(command.ksName); - List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.key); - ReadRepairDecision repairDecision = Schema.instance.getCFMetaData(command.ksName, command.cfName).newReadRepairDecision(); + Keyspace keyspace = Keyspace.open(command.metadata().ksName); + List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey()); + ReadRepairDecision repairDecision = command.metadata().newReadRepairDecision(); List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision); // Throw UAE early if we don't have enough replicas. @@ -166,19 +158,19 @@ public abstract class AbstractReadExecutor ReadRepairMetrics.attempted.mark(); } - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId); RetryType retryType = cfs.metadata.getSpeculativeRetry().type; // Speculative retry is disabled *OR* there are simply no extra replicas to speculate. if (retryType == RetryType.NONE || consistencyLevel.blockFor(keyspace) == allReplicas.size()) - return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas); + return new NeverSpeculatingReadExecutor(keyspace, command, consistencyLevel, targetReplicas); if (targetReplicas.size() == allReplicas.size()) { // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy // (same amount of requests in total, but we turn 1 digest request into a full blown data request). - return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas); + return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas); } // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. @@ -199,16 +191,16 @@ public abstract class AbstractReadExecutor targetReplicas.add(extraReplica); if (retryType == RetryType.ALWAYS) - return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas); + return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas); else // PERCENTILE or CUSTOM. - return new SpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas); + return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas); } private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor { - public NeverSpeculatingReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) + public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) { - super(command, consistencyLevel, targetReplicas); + super(keyspace, command, consistencyLevel, targetReplicas); } public void executeAsync() @@ -234,12 +226,13 @@ public abstract class AbstractReadExecutor private final ColumnFamilyStore cfs; private volatile boolean speculated = false; - public SpeculatingReadExecutor(ColumnFamilyStore cfs, + public SpeculatingReadExecutor(Keyspace keyspace, + ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) { - super(command, consistencyLevel, targetReplicas); + super(keyspace, command, consistencyLevel, targetReplicas); this.cfs = cfs; } @@ -278,7 +271,7 @@ public abstract class AbstractReadExecutor { // Could be waiting on the data, or on enough digests. ReadCommand retryCommand = command; - if (resolver.getData() != null) + if (handler.resolver.isDataPresent()) retryCommand = command.copy().setIsDigestQuery(true); InetAddress extraReplica = Iterables.getLast(targetReplicas); @@ -304,12 +297,13 @@ public abstract class AbstractReadExecutor { private final ColumnFamilyStore cfs; - public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs, + public AlwaysSpeculatingReadExecutor(Keyspace keyspace, + ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) { - super(command, consistencyLevel, targetReplicas); + super(keyspace, command, consistencyLevel, targetReplicas); this.cfs = cfs; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/AbstractRowResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java deleted file mode 100644 index f362047..0000000 --- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.nio.ByteBuffer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.ReadResponse; -import org.apache.cassandra.db.Row; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.utils.concurrent.Accumulator; - -public abstract class AbstractRowResolver implements IResponseResolver<ReadResponse, Row> -{ - protected static final Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class); - - protected final String keyspaceName; - // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints - protected final Accumulator<MessageIn<ReadResponse>> replies; - protected final DecoratedKey key; - - public AbstractRowResolver(ByteBuffer key, String keyspaceName, int maxResponseCount) - { - this.key = StorageService.getPartitioner().decorateKey(key); - this.keyspaceName = keyspaceName; - this.replies = new Accumulator<>(maxResponseCount); - } - - public void preprocess(MessageIn<ReadResponse> message) - { - replies.add(message); - } - - public Iterable<MessageIn<ReadResponse>> getMessages() - { - return replies; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/AsyncRepairCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java index 6ac765b..dec5319 100644 --- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java +++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java @@ -29,11 +29,11 @@ import org.apache.cassandra.utils.WrappedRunnable; public class AsyncRepairCallback implements IAsyncCallback<ReadResponse> { - private final RowDataResolver repairResolver; + private final DataResolver repairResolver; private final int blockfor; protected final AtomicInteger received = new AtomicInteger(0); - public AsyncRepairCallback(RowDataResolver repairResolver, int blockfor) + public AsyncRepairCallback(DataResolver repairResolver, int blockfor) { this.repairResolver = repairResolver; this.blockfor = blockfor; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/CASRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java index 3d86637..1db100d 100644 --- a/src/java/org/apache/cassandra/service/CASRequest.java +++ b/src/java/org/apache/cassandra/service/CASRequest.java @@ -17,8 +17,9 @@ */ package org.apache.cassandra.service; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.filter.IDiskAtomFilter; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.exceptions.InvalidRequestException; /** @@ -27,19 +28,19 @@ import org.apache.cassandra.exceptions.InvalidRequestException; public interface CASRequest { /** - * The filter to use to fetch the value to compare for the CAS. + * The command to use to fetch the value to compare for the CAS. */ - public IDiskAtomFilter readFilter(); + public SinglePartitionReadCommand readCommand(int nowInSec); /** * Returns whether the provided CF, that represents the values fetched using the * readFilter(), match the CAS conditions this object stands for. */ - public boolean appliesTo(ColumnFamily current) throws InvalidRequestException; + public boolean appliesTo(FilteredPartition current) throws InvalidRequestException; /** * The updates to perform of a CAS success. The values fetched using the readFilter() * are passed as argument. */ - public ColumnFamily makeUpdates(ColumnFamily current) throws InvalidRequestException; + public PartitionUpdate makeUpdates(FilteredPartition current) throws InvalidRequestException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index a775627..e82e8a4 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -43,17 +43,20 @@ import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.ArrayBackedCachedPartition; +import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.OpOrder; public class CacheService implements CacheServiceMBean { @@ -362,24 +365,45 @@ public class CacheService implements CacheServiceMBean public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException { final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in); - final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(ByteBufferUtil.readWithLength(in)); + final ByteBuffer cellName = ByteBufferUtil.readWithLength(in); return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>() { public Pair<CounterCacheKey, ClockAndCount> call() throws Exception { DecoratedKey key = cfs.partitioner.decorateKey(partitionKey); - QueryFilter filter = QueryFilter.getNamesFilter(key, - cfs.metadata.cfName, - FBUtilities.singleton(cellName, cfs.metadata.comparator), - Long.MIN_VALUE); - ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE); - if (cf == null) - return null; - Cell cell = cf.getColumn(cellName); - if (cell == null || !cell.isLive(Long.MIN_VALUE)) - return null; - ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value()); - return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount); + LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(cfs.metadata, cellName); + ColumnDefinition column = name.column; + CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement); + + int nowInSec = FBUtilities.nowInSeconds(); + ColumnFilter.Builder builder = ColumnFilter.selectionBuilder(); + if (path == null) + builder.add(column); + else + builder.select(column, path); + + ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(name.clustering, cfs.metadata.comparator), false); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key, builder.build(), filter); + try (OpOrder.Group op = cfs.readOrdering.start(); RowIterator iter = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, op), nowInSec)) + { + Cell cell; + if (column.isStatic()) + { + cell = iter.staticRow().getCell(column); + } + else + { + if (!iter.hasNext()) + return null; + cell = iter.next().getCell(column); + } + + if (cell == null) + return null; + + ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value()); + return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, name.clustering, column, path), clockAndCount); + } } }); } @@ -395,14 +419,19 @@ public class CacheService implements CacheServiceMBean public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException { final ByteBuffer buffer = ByteBufferUtil.readWithLength(in); + final int rowsToCache = cfs.metadata.getCaching().rowCache.rowsToCache; + return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey, IRowCacheEntry>>() { public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception { DecoratedKey key = cfs.partitioner.decorateKey(buffer); - QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE); - ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE); - return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data); + int nowInSec = FBUtilities.nowInSeconds(); + try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op)) + { + CachedPartition toCache = ArrayBackedCachedPartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec); + return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry)toCache); + } } }); } @@ -423,7 +452,7 @@ public class CacheService implements CacheServiceMBean ByteBufferUtil.writeWithLength(key.key, out); out.writeInt(key.desc.generation); out.writeBoolean(true); - key.desc.getFormat().getIndexSerializer(cfm).serialize(entry, out); + key.desc.getFormat().getIndexSerializer(cfm, key.desc.version, SerializationHeader.forKeyCache(cfm)).serialize(entry, out); } public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException @@ -443,7 +472,10 @@ public class CacheService implements CacheServiceMBean RowIndexEntry.Serializer.skipPromotedIndex(input); return null; } - RowIndexEntry entry = reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input, reader.descriptor.version); + RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata, + reader.descriptor.version, + SerializationHeader.forKeyCache(cfs.metadata)); + RowIndexEntry entry = indexSerializer.deserialize(input); return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java new file mode 100644 index 0000000..b2d1954 --- /dev/null +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.TimeoutException; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.net.AsyncOneResponse; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; + +public class DataResolver extends ResponseResolver +{ + private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<AsyncOneResponse>()); + + public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) + { + super(keyspace, command, consistency, maxResponseCount); + } + + public PartitionIterator getData() + { + ReadResponse response = responses.iterator().next().payload; + return UnfilteredPartitionIterators.filter(response.makeIterator(), command.nowInSec()); + } + + public PartitionIterator resolve() + { + // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here + // at the beginning of this method), so grab the response count once and use that through the method. + int count = responses.size(); + List<UnfilteredPartitionIterator> iters = new ArrayList<>(count); + InetAddress[] sources = new InetAddress[count]; + for (int i = 0; i < count; i++) + { + MessageIn<ReadResponse> msg = responses.get(i); + iters.add(msg.payload.makeIterator()); + sources[i] = msg.from; + } + + // Even though every responses should honor the limit, we might have more than requested post reconciliation, + // so ensure we're respecting the limit. + DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true); + return new CountingPartitionIterator(mergeWithShortReadProtection(iters, sources, counter), counter); + } + + private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter) + { + // If we have only one results, there is no read repair to do and we can't get short reads + if (results.size() == 1) + return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec()); + + UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources); + + // So-called "short reads" stems from nodes returning only a subset of the results they have for a partition due to the limit, + // but that subset not being enough post-reconciliation. So if we don't have limit, don't bother. + if (command.limits().isUnlimited()) + return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener); + + for (int i = 0; i < results.size(); i++) + results.set(i, new ShortReadProtectedIterator(sources[i], results.get(i), resultCounter)); + + return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener); + } + + private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener + { + private final InetAddress[] sources; + + public RepairMergeListener(InetAddress[] sources) + { + this.sources = sources; + } + + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + return new MergeListener(partitionKey, columns(versions), isReversed(versions)); + } + + private PartitionColumns columns(List<UnfilteredRowIterator> versions) + { + Columns statics = Columns.NONE; + Columns regulars = Columns.NONE; + for (UnfilteredRowIterator iter : versions) + { + if (iter == null) + continue; + + PartitionColumns cols = iter.columns(); + statics = statics.mergeTo(cols.statics); + regulars = regulars.mergeTo(cols.regulars); + } + return new PartitionColumns(statics, regulars); + } + + private boolean isReversed(List<UnfilteredRowIterator> versions) + { + assert !versions.isEmpty(); + // Everything will be in the same order + return versions.get(0).isReverseOrder(); + } + + public void close() + { + try + { + FBUtilities.waitOnFutures(repairResults, DatabaseDescriptor.getWriteRpcTimeout()); + } + catch (TimeoutException ex) + { + // We got all responses, but timed out while repairing + int blockFor = consistency.blockFor(keyspace); + if (Tracing.isTracing()) + Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); + else + logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor); + + throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); + } + } + + private class MergeListener implements UnfilteredRowIterators.MergeListener + { + private final DecoratedKey partitionKey; + private final PartitionColumns columns; + private final boolean isReversed; + private final PartitionUpdate[] repairs = new PartitionUpdate[sources.length]; + + private final Row.Writer[] currentRows = new Row.Writer[sources.length]; + private Clustering currentClustering; + private ColumnDefinition currentColumn; + + private final Slice.Bound[] markerOpen = new Slice.Bound[sources.length]; + private final DeletionTime[] markerTime = new DeletionTime[sources.length]; + + public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed) + { + this.partitionKey = partitionKey; + this.columns = columns; + this.isReversed = isReversed; + } + + private PartitionUpdate update(int i) + { + PartitionUpdate upd = repairs[i]; + if (upd == null) + { + upd = new PartitionUpdate(command.metadata(), partitionKey, columns, 1); + repairs[i] = upd; + } + return upd; + } + + private Row.Writer currentRow(int i) + { + Row.Writer row = currentRows[i]; + if (row == null) + { + row = currentClustering == Clustering.STATIC_CLUSTERING ? update(i).staticWriter() : update(i).writer(); + currentClustering.writeTo(row); + currentRows[i] = row; + } + return row; + } + + public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + for (int i = 0; i < versions.length; i++) + { + DeletionTime version = versions[i]; + if (mergedDeletion.supersedes(versions[i])) + update(i).addPartitionDeletion(mergedDeletion); + } + } + + public void onMergingRows(Clustering clustering, + LivenessInfo mergedInfo, + DeletionTime mergedDeletion, + Row[] versions) + { + currentClustering = clustering; + for (int i = 0; i < versions.length; i++) + { + Row version = versions[i]; + + if (version == null || mergedInfo.supersedes(version.primaryKeyLivenessInfo())) + currentRow(i).writePartitionKeyLivenessInfo(mergedInfo); + + if (version == null || mergedDeletion.supersedes(version.deletion())) + currentRow(i).writeRowDeletion(mergedDeletion); + } + } + + public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedCompositeDeletion, DeletionTime[] versions) + { + currentColumn = c; + for (int i = 0; i < versions.length; i++) + { + DeletionTime version = versions[i] == null ? DeletionTime.LIVE : versions[i]; + if (mergedCompositeDeletion.supersedes(version)) + currentRow(i).writeComplexDeletion(c, mergedCompositeDeletion); + } + } + + public void onMergedCells(Cell mergedCell, Cell[] versions) + { + for (int i = 0; i < versions.length; i++) + { + Cell version = versions[i]; + Cell toAdd = version == null ? mergedCell : Cells.diff(mergedCell, version); + if (toAdd != null) + toAdd.writeTo(currentRow(i)); + } + } + + public void onRowDone() + { + for (int i = 0; i < currentRows.length; i++) + { + if (currentRows[i] != null) + currentRows[i].endOfRow(); + } + Arrays.fill(currentRows, null); + } + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + for (int i = 0; i < versions.length; i++) + { + RangeTombstoneMarker marker = versions[i]; + // Note that boundaries are both close and open, so it's not one or the other + if (merged.isClose(isReversed) && markerOpen[i] != null) + { + Slice.Bound open = markerOpen[i]; + Slice.Bound close = merged.isBoundary() ? ((RangeTombstoneBoundaryMarker)merged).createCorrespondingCloseBound(isReversed).clustering() : merged.clustering(); + update(i).addRangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), markerTime[i]); + } + if (merged.isOpen(isReversed) && (marker == null || merged.openDeletionTime(isReversed).supersedes(marker.openDeletionTime(isReversed)))) + { + markerOpen[i] = merged.isBoundary() ? ((RangeTombstoneBoundaryMarker)merged).createCorrespondingOpenBound(isReversed).clustering() : merged.clustering(); + markerTime[i] = merged.openDeletionTime(isReversed); + } + } + } + + public void close() + { + for (int i = 0; i < repairs.length; i++) + { + if (repairs[i] == null) + continue; + + // use a separate verb here because we don't want these to be get the white glove hint- + // on-timeout behavior that a "real" mutation gets + Tracing.trace("Sending read-repair-mutation to {}", sources[i]); + MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR); + repairResults.add(MessagingService.instance().sendRR(msg, sources[i])); + } + } + } + } + + private class ShortReadProtectedIterator extends CountingUnfilteredPartitionIterator + { + private final InetAddress source; + private final DataLimits.Counter postReconciliationCounter; + + private ShortReadProtectedIterator(InetAddress source, UnfilteredPartitionIterator iterator, DataLimits.Counter postReconciliationCounter) + { + super(iterator, command.limits().newCounter(command.nowInSec(), false)); + this.source = source; + this.postReconciliationCounter = postReconciliationCounter; + } + + @Override + public UnfilteredRowIterator next() + { + return new ShortReadProtectedRowIterator(super.next()); + } + + private class ShortReadProtectedRowIterator extends WrappingUnfilteredRowIterator + { + private boolean initialReadIsDone; + private UnfilteredRowIterator shortReadContinuation; + private Clustering lastClustering; + + ShortReadProtectedRowIterator(UnfilteredRowIterator iter) + { + super(iter); + } + + @Override + public boolean hasNext() + { + if (super.hasNext()) + return true; + + initialReadIsDone = true; + + if (shortReadContinuation != null && shortReadContinuation.hasNext()) + return true; + + return checkForShortRead(); + } + + @Override + public Unfiltered next() + { + Unfiltered next = initialReadIsDone ? shortReadContinuation.next() : super.next(); + + if (next.kind() == Unfiltered.Kind.ROW) + lastClustering = ((Row)next).clustering(); + + return next; + } + + @Override + public void close() + { + try + { + super.close(); + } + finally + { + if (shortReadContinuation != null) + shortReadContinuation.close(); + } + } + + private boolean checkForShortRead() + { + assert shortReadContinuation == null || !shortReadContinuation.hasNext(); + + // We have a short read if the node this is the result of has returned the requested number of + // rows for that partition (i.e. it has stopped returning results due to the limit), but some of + // those results haven't made it in the final result post-reconciliation due to other nodes + // tombstones. If that is the case, then the node might have more results that we should fetch + // as otherwise we might return less results than required, or results that shouldn't be returned + // (because the node has tombstone that hides future results from other nodes but that haven't + // been returned due to the limit). + // Also note that we only get here once all the results for this node have been returned, and so + // if the node had returned the requested number but we still get there, it imply some results were + // skipped during reconciliation. + if (!counter.isDoneForPartition()) + return false; + + assert !postReconciliationCounter.isDoneForPartition(); + + // We need to try to query enough additional results to fulfill our query, but because we could still + // get short reads on that additional query, just querying the number of results we miss may not be + // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition), only + // x rows (postReconciliationCounter.countedInCurrentPartition()) made it in the final result. + // So our ratio of live rows to requested rows is x/n, so since we miss n-x rows, we estimate that + // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n. + // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a + // counting iterator. + int n = postReconciliationCounter.countedInCurrentPartition(); + int x = counter.countedInCurrentPartition(); + int toQuery = x == 0 + ? n * 2 // We didn't got any answer, so (somewhat randomly) ask for twice as much + : Math.max(((n * n) / x) - n, 1); + + DataLimits retryLimits = command.limits().forShortReadRetry(toQuery); + ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey()); + ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata().comparator, lastClustering, false); + SinglePartitionReadCommand<?> cmd = SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + retryLimits, + partitionKey(), + retryFilter); + + shortReadContinuation = doShortReadRetry(cmd); + return shortReadContinuation.hasNext(); + } + + private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand<?> retryCommand) + { + DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1); + ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source)); + if (StorageProxy.canDoLocalRequest(source)) + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); + else + MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler); + + // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. + handler.awaitResults(); + assert resolver.responses.size() == 1; + return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(), retryCommand); + } + } + } + + public boolean isDataPresent() + { + return !responses.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java new file mode 100644 index 0000000..12b0626 --- /dev/null +++ b/src/java/org/apache/cassandra/service/DigestResolver.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.net.MessageIn; + +public class DigestResolver extends ResponseResolver +{ + private volatile ReadResponse dataResponse; + + public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) + { + super(keyspace, command, consistency, maxResponseCount); + } + + @Override + public void preprocess(MessageIn<ReadResponse> message) + { + super.preprocess(message); + if (dataResponse == null && !message.payload.isDigestQuery()) + dataResponse = message.payload; + } + + /** + * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground + */ + public PartitionIterator getData() + { + assert isDataPresent(); + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), command.nowInSec()); + } + + /* + * This method handles two different scenarios: + * + * a) we're handling the initial read of data from the closest replica + digests + * from the rest. In this case we check the digests against each other, + * throw an exception if there is a mismatch, otherwise return the data row. + * + * b) we're checking additional digests that arrived after the minimum to handle + * the requested ConsistencyLevel, i.e. asynchronous read repair check + */ + public PartitionIterator resolve() throws DigestMismatchException + { + if (responses.size() == 1) + return getData(); + + if (logger.isDebugEnabled()) + logger.debug("resolving {} responses", responses.size()); + + long start = System.nanoTime(); + + // validate digests against each other; throw immediately on mismatch. + ByteBuffer digest = null; + for (MessageIn<ReadResponse> message : responses) + { + ReadResponse response = message.payload; + + ByteBuffer newDigest = response.digest(); + if (digest == null) + digest = newDigest; + else if (!digest.equals(newDigest)) + // rely on the fact that only single partition queries use digests + throw new DigestMismatchException(((SinglePartitionReadCommand)command).partitionKey(), digest, newDigest); + } + + if (logger.isDebugEnabled()) + logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), command.nowInSec()); + } + + public boolean isDataPresent() + { + return dataResponse != null; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/IReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IReadCommand.java b/src/java/org/apache/cassandra/service/IReadCommand.java deleted file mode 100644 index c6a129e..0000000 --- a/src/java/org/apache/cassandra/service/IReadCommand.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -public interface IReadCommand -{ - public String getKeyspace(); - public long getTimeout(); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/IResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java deleted file mode 100644 index 17c8bff..0000000 --- a/src/java/org/apache/cassandra/service/IResponseResolver.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import org.apache.cassandra.net.MessageIn; - -public interface IResponseResolver<TMessage, TResolved> { - - /** - * This Method resolves the responses that are passed in . for example : if - * its write response then all we get is true or false return values which - * implies if the writes were successful but for reads its more complicated - * you need to look at the responses and then based on differences schedule - * repairs . Hence you need to derive a response resolver based on your - * needs from this interface. - */ - public TResolved resolve() throws DigestMismatchException; - - public boolean isDataPresent(); - - /** - * returns the data response without comparing with any digests - */ - public TResolved getData(); - - public void preprocess(MessageIn<TMessage> message); - public Iterable<MessageIn<TMessage>> getMessages(); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java deleted file mode 100644 index 640681b..0000000 --- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.net.InetAddress; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; - -import com.google.common.collect.AbstractIterator; - -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RangeSliceReply; -import org.apache.cassandra.db.Row; -import org.apache.cassandra.net.AsyncOneResponse; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.MergeIterator; -import org.apache.cassandra.utils.Pair; - -/** - * Turns RangeSliceReply objects into row (string -> CF) maps, resolving - * to the most recent ColumnFamily and setting up read repairs as necessary. - */ -public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceReply, Iterable<Row>> -{ - private static final Comparator<Pair<Row,InetAddress>> pairComparator = new Comparator<Pair<Row, InetAddress>>() - { - public int compare(Pair<Row, InetAddress> o1, Pair<Row, InetAddress> o2) - { - return o1.left.key.compareTo(o2.left.key); - } - }; - - private final String keyspaceName; - private final long timestamp; - private List<InetAddress> sources; - protected final Collection<MessageIn<RangeSliceReply>> responses = new ConcurrentLinkedQueue<MessageIn<RangeSliceReply>>(); - public final List<AsyncOneResponse> repairResults = new ArrayList<AsyncOneResponse>(); - - public RangeSliceResponseResolver(String keyspaceName, long timestamp) - { - this.keyspaceName = keyspaceName; - this.timestamp = timestamp; - } - - public void setSources(List<InetAddress> endpoints) - { - this.sources = endpoints; - } - - public List<Row> getData() - { - MessageIn<RangeSliceReply> response = responses.iterator().next(); - return response.payload.rows; - } - - // Note: this would deserialize the response a 2nd time if getData was called first. - // (this is not currently an issue since we don't do read repair for range queries.) - public Iterable<Row> resolve() - { - ArrayList<RowIterator> iters = new ArrayList<RowIterator>(responses.size()); - int n = 0; - for (MessageIn<RangeSliceReply> response : responses) - { - RangeSliceReply reply = response.payload; - n = Math.max(n, reply.rows.size()); - iters.add(new RowIterator(reply.rows.iterator(), response.from)); - } - // for each row, compute the combination of all different versions seen, and repair incomplete versions - // TODO do we need to call close? - CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, new Reducer()); - - List<Row> resolvedRows = new ArrayList<Row>(n); - while (iter.hasNext()) - resolvedRows.add(iter.next()); - - return resolvedRows; - } - - public void preprocess(MessageIn message) - { - responses.add(message); - } - - public boolean isDataPresent() - { - return !responses.isEmpty(); - } - - private static class RowIterator extends AbstractIterator<Pair<Row,InetAddress>> implements CloseableIterator<Pair<Row,InetAddress>> - { - private final Iterator<Row> iter; - private final InetAddress source; - - private RowIterator(Iterator<Row> iter, InetAddress source) - { - this.iter = iter; - this.source = source; - } - - protected Pair<Row,InetAddress> computeNext() - { - return iter.hasNext() ? Pair.create(iter.next(), source) : endOfData(); - } - - public void close() {} - } - - public Iterable<MessageIn<RangeSliceReply>> getMessages() - { - return responses; - } - - private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>, Row> - { - List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size()); - List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size()); - DecoratedKey key; - - public void reduce(Pair<Row,InetAddress> current) - { - key = current.left.key; - versions.add(current.left.cf); - versionSources.add(current.right); - } - - protected Row getReduced() - { - ColumnFamily resolved = versions.size() > 1 - ? RowDataResolver.resolveSuperset(versions, timestamp) - : versions.get(0); - if (versions.size() < sources.size()) - { - // add placeholder rows for sources that didn't have any data, so maybeScheduleRepairs sees them - for (InetAddress source : sources) - { - if (!versionSources.contains(source)) - { - versions.add(null); - versionSources.add(source); - } - } - } - // resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet - if (resolved != null) - repairResults.addAll(RowDataResolver.scheduleRepairs(resolved, keyspaceName, key, versions, versionSources)); - versions.clear(); - versionSources.clear(); - return new Row(key, resolved); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java deleted file mode 100644 index 0f3726c..0000000 --- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import org.apache.cassandra.db.AbstractRangeCommand; -import org.apache.cassandra.db.RangeSliceReply; -import org.apache.cassandra.net.IVerbHandler; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.tracing.Tracing; - -public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand> -{ - public void doVerb(MessageIn<AbstractRangeCommand> message, int id) - { - if (StorageService.instance.isBootstrapMode()) - { - /* Don't service reads! */ - throw new RuntimeException("Cannot service reads while bootstrapping!"); - } - RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally()); - Tracing.trace("Enqueuing response to {}", message.from); - MessagingService.instance().sendReply(reply.createMessage(), id, message.from); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 0c008e7..d548019 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ReadCommand; -import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.UnavailableException; @@ -46,16 +46,16 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.SimpleCondition; -public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFailure<TMessage> +public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> { protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class ); - public final IResponseResolver<TMessage, TResolved> resolver; + public final ResponseResolver resolver; private final SimpleCondition condition = new SimpleCondition(); - final long start; + private final long start; final int blockfor; final List<InetAddress> endpoints; - private final IReadCommand command; + private final ReadCommand command; private final ConsistencyLevel consistencyLevel; private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received"); @@ -69,14 +69,17 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail /** * Constructor when response count has to be calculated and blocked for. */ - public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> filteredEndpoints) + public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints) { - this(resolver, consistencyLevel, consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())), command, Keyspace.open(command.getKeyspace()), filteredEndpoints); - if (logger.isTraceEnabled()) - logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ","))); + this(resolver, + consistencyLevel, + consistencyLevel.blockFor(Keyspace.open(command.metadata().ksName)), + command, + Keyspace.open(command.metadata().ksName), + filteredEndpoints); } - public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, Keyspace keyspace, List<InetAddress> endpoints) + public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints) { this.command = command; this.keyspace = keyspace; @@ -86,7 +89,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail this.start = System.nanoTime(); this.endpoints = endpoints; // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897) - assert !(resolver instanceof RangeSliceResponseResolver) || blockfor >= endpoints.size(); + assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size(); + + if (logger.isTraceEnabled()) + logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ","))); } public boolean await(long timePastStart, TimeUnit unit) @@ -102,31 +108,46 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail } } - public TResolved get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException + public void awaitResults() throws ReadFailureException, ReadTimeoutException { - if (!await(command.getTimeout(), TimeUnit.MILLISECONDS)) + boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS); + boolean failed = blockfor + failures > endpoints.size(); + if (signaled && !failed) + return; + + if (Tracing.isTracing()) { - // Same as for writes, see AbstractWriteResponseHandler - ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent()); - Tracing.trace("Read timeout: {}", ex.toString()); - if (logger.isDebugEnabled()) - logger.debug("Read timeout: {}", ex.toString()); - throw ex; + String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; + Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData }); } - - if (blockfor + failures > endpoints.size()) + else if (logger.isDebugEnabled()) { - ReadFailureException ex = new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent()); - - if (logger.isDebugEnabled()) - logger.debug("Read failure: {}", ex.toString()); - throw ex; + String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; + logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData }); } - return blockfor == 1 ? resolver.getData() : resolver.resolve(); + // Same as for writes, see AbstractWriteResponseHandler + throw failed + ? new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent()) + : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent()); + } + + public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException + { + awaitResults(); + + PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve(); + if (logger.isDebugEnabled()) + logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + return result; + } + + public int blockFor() + { + return blockfor; } - public void response(MessageIn<TMessage> message) + public void response(MessageIn<ReadResponse> message) { resolver.preprocess(message); int n = waitingFor(message.from) @@ -165,13 +186,13 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail return received; } - public void response(TMessage result) + public void response(ReadResponse result) { - MessageIn<TMessage> message = MessageIn.create(FBUtilities.getBroadcastAddress(), - result, - Collections.<String, byte[]>emptyMap(), - MessagingService.Verb.INTERNAL_RESPONSE, - MessagingService.current_version); + MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(), + result, + Collections.<String, byte[]>emptyMap(), + MessagingService.Verb.INTERNAL_RESPONSE, + MessagingService.current_version); response(message); } @@ -196,7 +217,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail public void run() { - // If the resolver is a RowDigestResolver, we need to do a full data read if there is a mismatch. + // If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch. // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never // get a digest mismatch) try @@ -205,7 +226,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail } catch (DigestMismatchException e) { - assert resolver instanceof RowDigestResolver; + assert resolver instanceof DigestResolver; if (traceState != null) traceState.trace("Digest mismatch: {}", e.toString()); @@ -214,11 +235,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail ReadRepairMetrics.repairedBackground.mark(); - ReadCommand readCommand = (ReadCommand) command; - final RowDataResolver repairResolver = new RowDataResolver(readCommand.ksName, readCommand.key, readCommand.filter(), readCommand.timestamp, endpoints.size()); + final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size()); AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size()); - MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage(); + MessageOut<ReadCommand> message = command.createMessage(); for (InetAddress endpoint : endpoints) MessagingService.instance().sendRR(message, endpoint, repairHandler); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/ResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java new file mode 100644 index 0000000..e7c94a1 --- /dev/null +++ b/src/java/org/apache/cassandra/service/ResponseResolver.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.utils.concurrent.Accumulator; + +public abstract class ResponseResolver +{ + protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class); + + protected final Keyspace keyspace; + protected final ReadCommand command; + protected final ConsistencyLevel consistency; + + // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints + protected final Accumulator<MessageIn<ReadResponse>> responses; + + public ResponseResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) + { + this.keyspace = keyspace; + this.command = command; + this.consistency = consistency; + this.responses = new Accumulator<>(maxResponseCount); + } + + public abstract PartitionIterator getData(); + public abstract PartitionIterator resolve() throws DigestMismatchException; + + public abstract boolean isDataPresent(); + + public void preprocess(MessageIn<ReadResponse> message) + { + responses.add(message); + } + + public Iterable<MessageIn<ReadResponse>> getMessages() + { + return responses; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RowDataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java deleted file mode 100644 index e935ce7..0000000 --- a/src/java/org/apache/cassandra/service/RowDataResolver.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Iterables; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.net.*; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.FBUtilities; - -public class RowDataResolver extends AbstractRowResolver -{ - private int maxLiveCount = 0; - public List<AsyncOneResponse> repairResults = Collections.emptyList(); - private final IDiskAtomFilter filter; - private final long timestamp; - - public RowDataResolver(String keyspaceName, ByteBuffer key, IDiskAtomFilter qFilter, long timestamp, int maxResponseCount) - { - super(key, keyspaceName, maxResponseCount); - this.filter = qFilter; - this.timestamp = timestamp; - } - - /* - * This method handles the following scenario: - * - * there was a mismatch on the initial read, so we redid the digest requests - * as full data reads. In this case we need to compute the most recent version - * of each column, and send diffs to out-of-date replicas. - */ - public Row resolve() throws DigestMismatchException - { - int replyCount = replies.size(); - if (logger.isDebugEnabled()) - logger.debug("resolving {} responses", replyCount); - long start = System.nanoTime(); - - ColumnFamily resolved; - if (replyCount > 1) - { - List<ColumnFamily> versions = new ArrayList<>(replyCount); - List<InetAddress> endpoints = new ArrayList<>(replyCount); - - for (MessageIn<ReadResponse> message : replies) - { - ReadResponse response = message.payload; - ColumnFamily cf = response.row().cf; - assert !response.isDigestQuery() : "Received digest response to repair read from " + message.from; - versions.add(cf); - endpoints.add(message.from); - - // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643 - int liveCount = cf == null ? 0 : filter.getLiveCount(cf, timestamp); - if (liveCount > maxLiveCount) - maxLiveCount = liveCount; - } - - resolved = resolveSuperset(versions, timestamp); - if (logger.isDebugEnabled()) - logger.debug("versions merged"); - - // send updates to any replica that was missing part of the full row - // (resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet) - if (resolved != null) - repairResults = scheduleRepairs(resolved, keyspaceName, key, versions, endpoints); - } - else - { - resolved = replies.get(0).payload.row().cf; - } - - if (logger.isDebugEnabled()) - logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - - return new Row(key, resolved); - } - - /** - * For each row version, compare with resolved (the superset of all row versions); - * if it is missing anything, send a mutation to the endpoint it come from. - */ - public static List<AsyncOneResponse> scheduleRepairs(ColumnFamily resolved, String keyspaceName, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints) - { - List<AsyncOneResponse> results = new ArrayList<AsyncOneResponse>(versions.size()); - - for (int i = 0; i < versions.size(); i++) - { - ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved); - if (diffCf == null) // no repair needs to happen - continue; - - // create and send the mutation message based on the diff - Mutation mutation = new Mutation(keyspaceName, key.getKey(), diffCf); - // use a separate verb here because we don't want these to be get the white glove hint- - // on-timeout behavior that a "real" mutation gets - Tracing.trace("Sending read-repair-mutation to {}", endpoints.get(i)); - results.add(MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), - endpoints.get(i))); - } - - return results; - } - - static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions, long now) - { - assert Iterables.size(versions) > 0; - - ColumnFamily resolved = null; - for (ColumnFamily cf : versions) - { - if (cf == null) - continue; - - if (resolved == null) - resolved = cf.cloneMeShallow(); - else - resolved.delete(cf); - } - if (resolved == null) - return null; - - // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes. - // this will handle removing columns and subcolumns that are suppressed by a row or - // supercolumn tombstone. - QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter(), now); - List<CloseableIterator<Cell>> iters = new ArrayList<>(Iterables.size(versions)); - for (ColumnFamily version : versions) - if (version != null) - iters.add(FBUtilities.closeableIterator(version.iterator())); - filter.collateColumns(resolved, iters, Integer.MIN_VALUE); - return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE); - } - - public Row getData() - { - assert !replies.isEmpty(); - return replies.get(0).payload.row(); - } - - public boolean isDataPresent() - { - return !replies.isEmpty(); - } - - public int getMaxLiveCount() - { - return maxLiveCount; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RowDigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java deleted file mode 100644 index 82ccc1a..0000000 --- a/src/java/org/apache/cassandra/service/RowDigestResolver.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ReadResponse; -import org.apache.cassandra.db.Row; -import org.apache.cassandra.net.MessageIn; - -public class RowDigestResolver extends AbstractRowResolver -{ - public RowDigestResolver(String keyspaceName, ByteBuffer key, int maxResponseCount) - { - super(key, keyspaceName, maxResponseCount); - } - - /** - * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground - */ - public Row getData() - { - for (MessageIn<ReadResponse> message : replies) - { - ReadResponse result = message.payload; - if (!result.isDigestQuery()) - return result.row(); - } - return null; - } - - /* - * This method handles two different scenarios: - * - * a) we're handling the initial read, of data from the closest replica + digests - * from the rest. In this case we check the digests against each other, - * throw an exception if there is a mismatch, otherwise return the data row. - * - * b) we're checking additional digests that arrived after the minimum to handle - * the requested ConsistencyLevel, i.e. asynchronous read repair check - */ - public Row resolve() throws DigestMismatchException - { - if (logger.isDebugEnabled()) - logger.debug("resolving {} responses", replies.size()); - - long start = System.nanoTime(); - - // validate digests against each other; throw immediately on mismatch. - // also extract the data reply, if any. - ColumnFamily data = null; - ByteBuffer digest = null; - - for (MessageIn<ReadResponse> message : replies) - { - ReadResponse response = message.payload; - - ByteBuffer newDigest; - if (response.isDigestQuery()) - { - newDigest = response.digest(); - } - else - { - // note that this allows for multiple data replies, post-CASSANDRA-5932 - data = response.row().cf; - newDigest = ColumnFamily.digest(data); - } - - if (digest == null) - digest = newDigest; - else if (!digest.equals(newDigest)) - throw new DigestMismatchException(key, digest, newDigest); - } - - if (logger.isDebugEnabled()) - logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - return new Row(key, data); - } - - public boolean isDataPresent() - { - for (MessageIn<ReadResponse> message : replies) - { - if (!message.payload.isDigestQuery()) - return true; - } - return false; - } -}
