Ensure CL guarantees on digest mismatch patch by slebresne; reviewed by jbellis for CASSANDRA-5113
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d787b78 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d787b78 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d787b78 Branch: refs/heads/trunk Commit: 3d787b78c155773edcf29af8290ef1bea62a4206 Parents: af8a477 Author: Sylvain Lebresne <[email protected]> Authored: Mon Jan 7 11:11:45 2013 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Mon Jan 7 11:11:45 2013 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/ReadCommand.java | 4 +- .../apache/cassandra/db/SliceFromReadCommand.java | 6 +- .../cassandra/service/AsyncRepairCallback.java | 4 +- .../cassandra/service/DatacenterReadCallback.java | 13 +- .../service/RangeSliceResponseResolver.java | 4 +- .../org/apache/cassandra/service/ReadCallback.java | 26 ++- .../apache/cassandra/service/RepairCallback.java | 86 ------- .../apache/cassandra/service/RowDataResolver.java | 182 +++++++++++++++ .../cassandra/service/RowDigestResolver.java | 2 +- .../cassandra/service/RowRepairResolver.java | 182 --------------- .../org/apache/cassandra/service/StorageProxy.java | 16 +- .../apache/cassandra/service/RowResolverTest.java | 20 +- 13 files changed, 247 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2ec66c9..95aad22 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,7 @@ * cqlsh: Add default limit to SELECT statements (CASSANDRA-4972) * cqlsh: fix DESCRIBE for 1.1 cfs in CQL3 (CASSANDRA-5101) * Correctly gossip with nodes >= 1.1.7 (CASSANDRA-5102) + * Ensure CL guarantees on digest mismatch (CASSANDRA-5113) Merged from 1.1: * Pig: correctly decode row keys in widerow mode (CASSANDRA-5098) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index f3494e5..6c364cb 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -32,7 +32,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.IReadCommand; -import org.apache.cassandra.service.RepairCallback; +import org.apache.cassandra.service.RowDataResolver; import org.apache.cassandra.utils.IFilter; @@ -94,7 +94,7 @@ public abstract class ReadCommand implements IReadCommand } // maybeGenerateRetryCommand is used to generate a retry for short reads - public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row) + public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row) { return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/db/SliceFromReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java index d52826b..8a08a42 100644 --- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java +++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java @@ -30,7 +30,7 @@ import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.service.RepairCallback; +import org.apache.cassandra.service.RowDataResolver; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.utils.ByteBufferUtil; @@ -71,9 +71,9 @@ public class SliceFromReadCommand extends ReadCommand } @Override - public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row) + public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row) { - int maxLiveColumns = handler.getMaxLiveCount(); + int maxLiveColumns = resolver.getMaxLiveCount(); int count = filter.count; assert maxLiveColumns <= count; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/AsyncRepairCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java index 675f61c..63b7df3 100644 --- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java +++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java @@ -28,11 +28,11 @@ import org.apache.cassandra.utils.WrappedRunnable; public class AsyncRepairCallback implements IAsyncCallback { - private final RowRepairResolver repairResolver; + private final RowDataResolver repairResolver; private final int blockfor; protected final AtomicInteger received = new AtomicInteger(0); - public AsyncRepairCallback(RowRepairResolver repairResolver, int blockfor) + public AsyncRepairCallback(RowDataResolver repairResolver, int blockfor) { this.repairResolver = repairResolver; this.blockfor = blockfor; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/DatacenterReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java index d125553..e1ae652 100644 --- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java +++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java @@ -46,11 +46,22 @@ public class DatacenterReadCallback<TMessage, TResolved> extends ReadCallback<TM } }; - public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints) + public DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints) { super(resolver, consistencyLevel, command, endpoints); } + protected DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints) + { + super(resolver, consistencyLevel, blockfor, command, endpoints); + } + + @Override + public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver) + { + return new DatacenterReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints); + } + @Override protected void sortForConsistencyLevel(List<InetAddress> endpoints) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java index 0d24fbf..1dfd01e 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java +++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java @@ -138,7 +138,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR protected Row getReduced() { ColumnFamily resolved = versions.size() > 1 - ? RowRepairResolver.resolveSuperset(versions) + ? RowDataResolver.resolveSuperset(versions) : versions.get(0); if (versions.size() < sources.size()) { @@ -154,7 +154,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR } // resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet if (resolved != null) - repairResults.addAll(RowRepairResolver.scheduleRepairs(resolved, table, key, versions, versionSources)); + repairResults.addAll(RowDataResolver.scheduleRepairs(resolved, table, key, versions, versionSources)); versions.clear(); versionSources.clear(); return new Row(key, resolved); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 8df2e10..e12859a 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -60,7 +60,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag private final long startTime; protected final int blockfor; final List<InetAddress> endpoints; - private final IReadCommand command; + protected final IReadCommand command; protected final ConsistencyLevel consistencyLevel; protected final AtomicInteger received = new AtomicInteger(0); @@ -75,11 +75,26 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag this.startTime = System.currentTimeMillis(); this.consistencyLevel = consistencyLevel; sortForConsistencyLevel(endpoints); - this.endpoints = resolver instanceof RowRepairResolver ? endpoints : filterEndpoints(endpoints); + this.endpoints = filterEndpoints(endpoints); if (logger.isTraceEnabled()) logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ","))); } + protected ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints) + { + this.command = command; + this.blockfor = blockfor; + this.consistencyLevel = consistencyLevel; + this.resolver = resolver; + this.startTime = System.currentTimeMillis(); + this.endpoints = endpoints; + } + + public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver) + { + return new ReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints); + } + /** * Endpoints is already restricted to live replicas, sorted by snitch preference. This is a hook for * DatacenterReadCallback to move local-DC replicas to the front of the list. We need this both @@ -209,17 +224,22 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag { protected void runMayThrow() throws IOException { + // If the resolver is a RowDigestResolver, we need to do a full data read if there is a mismatch. + // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never + // get a digest mismatch) try { resolver.resolve(); } catch (DigestMismatchException e) { + assert resolver instanceof RowDigestResolver; + if (logger.isDebugEnabled()) logger.debug("Digest mismatch:", e); ReadCommand readCommand = (ReadCommand) command; - final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key, readCommand.filter()); + final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter()); IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size()); MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RepairCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RepairCallback.java b/src/java/org/apache/cassandra/service/RepairCallback.java deleted file mode 100644 index 9388328..0000000 --- a/src/java/org/apache/cassandra/service/RepairCallback.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.io.IOException; -import java.net.InetAddress; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.Row; -import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.utils.SimpleCondition; - -public class RepairCallback implements IAsyncCallback -{ - public final RowRepairResolver resolver; - private final List<InetAddress> endpoints; - private final SimpleCondition condition = new SimpleCondition(); - private final long startTime; - protected final AtomicInteger received = new AtomicInteger(0); - - /** - * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel - * it needs to achieve. Repair on the other hand is happy to repair whoever replies within the timeout. - * - * (The other main difference of course is, this is only created once we know we have a digest - * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final - * stage in the read process.) - */ - public RepairCallback(RowRepairResolver resolver, List<InetAddress> endpoints) - { - this.resolver = resolver; - this.endpoints = endpoints; - this.startTime = System.currentTimeMillis(); - } - - public Row get() throws DigestMismatchException, IOException - { - long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime); - try - { - condition.await(timeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ex) - { - throw new AssertionError(ex); - } - - return received.get() > 1 ? resolver.resolve() : null; - } - - public void response(MessageIn message) - { - resolver.preprocess(message); - if (received.incrementAndGet() == endpoints.size()) - condition.signal(); - } - - public boolean isLatencyForSnitch() - { - return true; - } - - public int getMaxLiveCount() - { - return resolver.getMaxLiveCount(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RowDataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java new file mode 100644 index 0000000..5545293 --- /dev/null +++ b/src/java/org/apache/cassandra/service/RowDataResolver.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.filter.IDiskAtomFilter; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.net.IAsyncResult; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.IFilter; + +public class RowDataResolver extends AbstractRowResolver +{ + private int maxLiveCount = 0; + public List<IAsyncResult> repairResults = Collections.emptyList(); + private final IDiskAtomFilter filter; + + public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter) + { + super(key, table); + this.filter = qFilter; + } + + /* + * This method handles the following scenario: + * + * there was a mismatch on the initial read, so we redid the digest requests + * as full data reads. In this case we need to compute the most recent version + * of each column, and send diffs to out-of-date replicas. + */ + public Row resolve() throws DigestMismatchException, IOException + { + if (logger.isDebugEnabled()) + logger.debug("resolving " + replies.size() + " responses"); + long startTime = System.currentTimeMillis(); + + ColumnFamily resolved; + if (replies.size() > 1) + { + List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size()); + List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size()); + + for (MessageIn<ReadResponse> message : replies) + { + ReadResponse response = message.payload; + ColumnFamily cf = response.row().cf; + assert !response.isDigestQuery() : "Received digest response to repair read from " + message.from; + versions.add(cf); + endpoints.add(message.from); + + // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643 + int liveCount = cf == null ? 0 : filter.getLiveCount(cf); + if (liveCount > maxLiveCount) + maxLiveCount = liveCount; + } + + resolved = resolveSuperset(versions); + if (logger.isDebugEnabled()) + logger.debug("versions merged"); + + // send updates to any replica that was missing part of the full row + // (resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet) + if (resolved != null) + repairResults = scheduleRepairs(resolved, table, key, versions, endpoints); + } + else + { + resolved = replies.iterator().next().payload.row().cf; + } + + if (logger.isDebugEnabled()) + logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms."); + + return new Row(key, resolved); + } + + /** + * For each row version, compare with resolved (the superset of all row versions); + * if it is missing anything, send a mutation to the endpoint it come from. + */ + public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints) + { + List<IAsyncResult> results = new ArrayList<IAsyncResult>(versions.size()); + + for (int i = 0; i < versions.size(); i++) + { + ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved); + if (diffCf == null) // no repair needs to happen + continue; + + // create and send the row mutation message based on the diff + RowMutation rowMutation = new RowMutation(table, key.key); + rowMutation.add(diffCf); + MessageOut repairMessage; + // use a separate verb here because we don't want these to be get the white glove hint- + // on-timeout behavior that a "real" mutation gets + repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR); + results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i))); + } + + return results; + } + + static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions) + { + assert Iterables.size(versions) > 0; + + ColumnFamily resolved = null; + for (ColumnFamily cf : versions) + { + if (cf == null) + continue; + + if (resolved == null) + resolved = cf.cloneMeShallow(); + else + resolved.delete(cf); + } + if (resolved == null) + return null; + + // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes. + // this will handle removing columns and subcolumns that are supressed by a row or + // supercolumn tombstone. + QueryFilter filter = new QueryFilter(null, new QueryPath(resolved.metadata().cfName), new IdentityQueryFilter()); + List<CloseableIterator<IColumn>> iters = new ArrayList<CloseableIterator<IColumn>>(); + for (ColumnFamily version : versions) + { + if (version == null) + continue; + iters.add(FBUtilities.closeableIterator(version.iterator())); + } + filter.collateColumns(resolved, iters, Integer.MIN_VALUE); + return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE); + } + + public Row getData() throws IOException + { + return replies.iterator().next().payload.row(); + } + + public boolean isDataPresent() + { + return !replies.isEmpty(); + } + + public int getMaxLiveCount() + { + return maxLiveCount; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RowDigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java index e0e262b..eeccbeb 100644 --- a/src/java/org/apache/cassandra/service/RowDigestResolver.java +++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java @@ -95,7 +95,7 @@ public class RowDigestResolver extends AbstractRowResolver // with the data response. If there is a mismatch then throw an exception so that read repair can happen. // // It's important to note that we do not consider the possibility of multiple data responses -- - // that can only happen when we're doing the repair post-mismatch, and will be handled by RowRepairResolver. + // that can only happen when we're doing the repair post-mismatch, and will be handled by RowDataResolver. if (digest != null) { ByteBuffer digest2 = ColumnFamily.digest(data); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RowRepairResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java deleted file mode 100644 index 21cf5ab..0000000 --- a/src/java/org/apache/cassandra/service/RowRepairResolver.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.io.IOException; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import com.google.common.collect.Iterables; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.db.filter.SliceQueryFilter; -import org.apache.cassandra.net.IAsyncResult; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.IFilter; - -public class RowRepairResolver extends AbstractRowResolver -{ - private int maxLiveCount = 0; - public List<IAsyncResult> repairResults = Collections.emptyList(); - private final IDiskAtomFilter filter; - - public RowRepairResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter) - { - super(key, table); - this.filter = qFilter; - } - - /* - * This method handles the following scenario: - * - * there was a mismatch on the initial read, so we redid the digest requests - * as full data reads. In this case we need to compute the most recent version - * of each column, and send diffs to out-of-date replicas. - */ - public Row resolve() throws DigestMismatchException, IOException - { - if (logger.isDebugEnabled()) - logger.debug("resolving " + replies.size() + " responses"); - long startTime = System.currentTimeMillis(); - - ColumnFamily resolved; - if (replies.size() > 1) - { - List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size()); - List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size()); - - for (MessageIn<ReadResponse> message : replies) - { - ReadResponse response = message.payload; - ColumnFamily cf = response.row().cf; - assert !response.isDigestQuery() : "Received digest response to repair read from " + message.from; - versions.add(cf); - endpoints.add(message.from); - - // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643 - int liveCount = cf == null ? 0 : filter.getLiveCount(cf); - if (liveCount > maxLiveCount) - maxLiveCount = liveCount; - } - - resolved = resolveSuperset(versions); - if (logger.isDebugEnabled()) - logger.debug("versions merged"); - - // send updates to any replica that was missing part of the full row - // (resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet) - if (resolved != null) - repairResults = scheduleRepairs(resolved, table, key, versions, endpoints); - } - else - { - resolved = replies.iterator().next().payload.row().cf; - } - - if (logger.isDebugEnabled()) - logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms."); - - return new Row(key, resolved); - } - - /** - * For each row version, compare with resolved (the superset of all row versions); - * if it is missing anything, send a mutation to the endpoint it come from. - */ - public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints) - { - List<IAsyncResult> results = new ArrayList<IAsyncResult>(versions.size()); - - for (int i = 0; i < versions.size(); i++) - { - ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved); - if (diffCf == null) // no repair needs to happen - continue; - - // create and send the row mutation message based on the diff - RowMutation rowMutation = new RowMutation(table, key.key); - rowMutation.add(diffCf); - MessageOut repairMessage; - // use a separate verb here because we don't want these to be get the white glove hint- - // on-timeout behavior that a "real" mutation gets - repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR); - results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i))); - } - - return results; - } - - static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions) - { - assert Iterables.size(versions) > 0; - - ColumnFamily resolved = null; - for (ColumnFamily cf : versions) - { - if (cf == null) - continue; - - if (resolved == null) - resolved = cf.cloneMeShallow(); - else - resolved.delete(cf); - } - if (resolved == null) - return null; - - // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes. - // this will handle removing columns and subcolumns that are supressed by a row or - // supercolumn tombstone. - QueryFilter filter = new QueryFilter(null, new QueryPath(resolved.metadata().cfName), new IdentityQueryFilter()); - List<CloseableIterator<IColumn>> iters = new ArrayList<CloseableIterator<IColumn>>(); - for (ColumnFamily version : versions) - { - if (version == null) - continue; - iters.add(FBUtilities.closeableIterator(version.iterator())); - } - filter.collateColumns(resolved, iters, Integer.MIN_VALUE); - return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE); - } - - public Row getData() throws IOException - { - throw new UnsupportedOperationException(); - } - - public boolean isDataPresent() - { - throw new UnsupportedOperationException(); - } - - public int getMaxLiveCount() - { - return maxLiveCount; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index fe427af..0fb7ec4 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -923,7 +923,7 @@ public class StorageProxy implements StorageProxyMBean // read results and make a second pass for any digest mismatches List<ReadCommand> repairCommands = null; - List<RepairCallback> repairResponseHandlers = null; + List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null; for (int i = 0; i < commands.size(); i++) { ReadCallback<ReadResponse, Row> handler = readCallbacks[i]; @@ -946,13 +946,14 @@ public class StorageProxy implements StorageProxyMBean catch (DigestMismatchException ex) { logger.debug("Digest mismatch: {}", ex.toString()); - RowRepairResolver resolver = new RowRepairResolver(command.table, command.key, command.filter()); - RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints); + // Do a full data read to resolve the correct response (and repair node that need be) + RowDataResolver resolver = new RowDataResolver(command.table, command.key, command.filter()); + ReadCallback<ReadResponse, Row> repairHandler = handler.withNewResolver(resolver); if (repairCommands == null) { repairCommands = new ArrayList<ReadCommand>(); - repairResponseHandlers = new ArrayList<RepairCallback>(); + repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>(); } repairCommands.add(command); repairResponseHandlers.add(repairHandler); @@ -974,7 +975,7 @@ public class StorageProxy implements StorageProxyMBean for (int i = 0; i < repairCommands.size(); i++) { ReadCommand command = repairCommands.get(i); - RepairCallback handler = repairResponseHandlers.get(i); + ReadCallback<ReadResponse, Row> handler = repairResponseHandlers.get(i); Row row; try @@ -986,11 +987,12 @@ public class StorageProxy implements StorageProxyMBean throw new AssertionError(e); // full data requested from each node here, no digests should be sent } + RowDataResolver resolver = (RowDataResolver)handler.resolver; try { // wait for the repair writes to be acknowledged, to minimize impact on any replica that's // behind on writes in case the out-of-sync row is read multiple times in quick succession - FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); + FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); } catch (TimeoutException e) { @@ -999,7 +1001,7 @@ public class StorageProxy implements StorageProxyMBean } // retry any potential short reads - ReadCommand retryCommand = command.maybeGenerateRetryCommand(handler, row); + ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row); if (retryCommand != null) { logger.debug("Issuing retry for read command"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/test/unit/org/apache/cassandra/service/RowResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RowResolverTest.java b/test/unit/org/apache/cassandra/service/RowResolverTest.java index 3c530f1..2cc7860 100644 --- a/test/unit/org/apache/cassandra/service/RowResolverTest.java +++ b/test/unit/org/apache/cassandra/service/RowResolverTest.java @@ -46,7 +46,7 @@ public class RowResolverTest extends SchemaLoader ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1"); cf2.addColumn(column("c1", "v2", 1)); - ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2)); + ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2)); assertColumns(resolved, "c1"); assertColumns(ColumnFamily.diff(cf1, resolved), "c1"); assertNull(ColumnFamily.diff(cf2, resolved)); @@ -61,7 +61,7 @@ public class RowResolverTest extends SchemaLoader ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1"); cf2.addColumn(column("c2", "v2", 1)); - ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2)); + ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2)); assertColumns(resolved, "c1", "c2"); assertColumns(ColumnFamily.diff(cf1, resolved), "c2"); assertColumns(ColumnFamily.diff(cf2, resolved), "c1"); @@ -73,7 +73,7 @@ public class RowResolverTest extends SchemaLoader ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1"); cf2.addColumn(column("c2", "v2", 1)); - ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(null, cf2)); + ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(null, cf2)); assertColumns(resolved, "c2"); assertColumns(ColumnFamily.diff(null, resolved), "c2"); assertNull(ColumnFamily.diff(cf2, resolved)); @@ -85,7 +85,7 @@ public class RowResolverTest extends SchemaLoader ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1"); cf1.addColumn(column("c1", "v1", 0)); - ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, null)); + ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, null)); assertColumns(resolved, "c1"); assertNull(ColumnFamily.diff(cf1, resolved)); assertColumns(ColumnFamily.diff(null, resolved), "c1"); @@ -94,7 +94,7 @@ public class RowResolverTest extends SchemaLoader @Test public void testResolveSupersetNullBoth() { - assertNull(RowRepairResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null))); + assertNull(RowDataResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null))); } @Test @@ -107,7 +107,7 @@ public class RowResolverTest extends SchemaLoader ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1"); cf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000))); - ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2)); + ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2)); // no columns in the cf assertColumns(resolved); assertTrue(resolved.isMarkedForDelete()); @@ -119,7 +119,7 @@ public class RowResolverTest extends SchemaLoader ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1"); scf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000))); - ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2)); + ColumnFamily superResolved = RowDataResolver.resolveSuperset(Arrays.asList(scf1, scf2)); // no columns in the cf assertColumns(superResolved); assertTrue(superResolved.isMarkedForDelete()); @@ -138,7 +138,7 @@ public class RowResolverTest extends SchemaLoader ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1"); scf2.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000))); - ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2)); + ColumnFamily superResolved = RowDataResolver.resolveSuperset(Arrays.asList(scf1, scf2)); // no columns in the cf assertColumns(superResolved); assertTrue(superResolved.isMarkedForDelete()); @@ -165,7 +165,7 @@ public class RowResolverTest extends SchemaLoader ColumnFamily cf4 = ColumnFamily.create("Keyspace1", "Standard1"); cf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000))); - ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4)); + ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4)); // will have deleted marker and one column assertColumns(resolved, "two"); assertColumn(resolved, "two", "B", 3); @@ -188,7 +188,7 @@ public class RowResolverTest extends SchemaLoader ColumnFamily scf4 = ColumnFamily.create("Keyspace1", "Super1"); scf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000))); - ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2, scf3, scf4)); + ColumnFamily superResolved = RowDataResolver.resolveSuperset(Arrays.asList(scf1, scf2, scf3, scf4)); // will have deleted marker and two super cols assertColumns(superResolved, "super1", "super2");
