Updated Branches:
refs/heads/trunk f6df04dbd -> 94d76aa6c
Merge branch 'cassandra-1.2' into trunk
Conflicts:
src/java/org/apache/cassandra/db/SliceFromReadCommand.java
test/unit/org/apache/cassandra/service/RowResolverTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/94d76aa6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/94d76aa6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/94d76aa6
Branch: refs/heads/trunk
Commit: 94d76aa6c6a63d22fead4adb2df725af75274f7f
Parents: f6df04d 3d787b7
Author: Sylvain Lebresne <[email protected]>
Authored: Mon Jan 7 11:14:36 2013 +0100
Committer: Sylvain Lebresne <[email protected]>
Committed: Mon Jan 7 11:14:36 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ReadCommand.java | 4 +-
.../apache/cassandra/db/SliceFromReadCommand.java | 6 +-
.../cassandra/service/AsyncRepairCallback.java | 4 +-
.../cassandra/service/DatacenterReadCallback.java | 13 +-
.../service/RangeSliceResponseResolver.java | 4 +-
.../org/apache/cassandra/service/ReadCallback.java | 26 ++-
.../apache/cassandra/service/RepairCallback.java | 86 -------
.../apache/cassandra/service/RowDataResolver.java | 181 +++++++++++++++
.../cassandra/service/RowDigestResolver.java | 2 +-
.../cassandra/service/RowRepairResolver.java | 181 ---------------
.../org/apache/cassandra/service/StorageProxy.java | 16 +-
.../apache/cassandra/service/RowResolverTest.java | 14 +-
13 files changed, 243 insertions(+), 295 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 3a05ce0,8a08a42..523646d
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@@ -31,12 -29,10 +31,12 @@@ import org.apache.cassandra.db.filter.I
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.service.RepairCallback;
+ import org.apache.cassandra.service.RowDataResolver;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
public class SliceFromReadCommand extends ReadCommand
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/RowDataResolver.java
index 0000000,5545293..e04d7c5
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@@ -1,0 -1,182 +1,181 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.service;
+
+ import java.io.IOException;
+ import java.net.InetAddress;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+
+ import com.google.common.collect.Iterables;
+
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+ import org.apache.cassandra.db.filter.IDiskAtomFilter;
+ import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
+ import org.apache.cassandra.db.filter.SliceQueryFilter;
+ import org.apache.cassandra.net.IAsyncResult;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessageOut;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.utils.CloseableIterator;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.IFilter;
+
+ public class RowDataResolver extends AbstractRowResolver
+ {
+ private int maxLiveCount = 0;
+ public List<IAsyncResult> repairResults = Collections.emptyList();
+ private final IDiskAtomFilter filter;
+
+ public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter
qFilter)
+ {
+ super(key, table);
+ this.filter = qFilter;
+ }
+
+ /*
+ * This method handles the following scenario:
+ *
+ * there was a mismatch on the initial read, so we redid the digest
requests
+ * as full data reads. In this case we need to compute the most recent
version
+ * of each column, and send diffs to out-of-date replicas.
+ */
+ public Row resolve() throws DigestMismatchException, IOException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("resolving " + replies.size() + " responses");
+ long startTime = System.currentTimeMillis();
+
+ ColumnFamily resolved;
+ if (replies.size() > 1)
+ {
+ List<ColumnFamily> versions = new
ArrayList<ColumnFamily>(replies.size());
+ List<InetAddress> endpoints = new
ArrayList<InetAddress>(replies.size());
+
+ for (MessageIn<ReadResponse> message : replies)
+ {
+ ReadResponse response = message.payload;
+ ColumnFamily cf = response.row().cf;
+ assert !response.isDigestQuery() : "Received digest response
to repair read from " + message.from;
+ versions.add(cf);
+ endpoints.add(message.from);
+
+ // compute maxLiveCount to prevent short reads -- see
https://issues.apache.org/jira/browse/CASSANDRA-2643
+ int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
+ if (liveCount > maxLiveCount)
+ maxLiveCount = liveCount;
+ }
+
+ resolved = resolveSuperset(versions);
+ if (logger.isDebugEnabled())
+ logger.debug("versions merged");
+
+ // send updates to any replica that was missing part of the full
row
+ // (resolved can be null even if versions doesn't have all nulls
because of the call to removeDeleted in resolveSuperSet)
+ if (resolved != null)
+ repairResults = scheduleRepairs(resolved, table, key,
versions, endpoints);
+ }
+ else
+ {
+ resolved = replies.iterator().next().payload.row().cf;
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("resolve: " + (System.currentTimeMillis() -
startTime) + " ms.");
+
+ return new Row(key, resolved);
+ }
+
+ /**
+ * For each row version, compare with resolved (the superset of all row
versions);
+ * if it is missing anything, send a mutation to the endpoint it come
from.
+ */
+ public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved,
String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress>
endpoints)
+ {
+ List<IAsyncResult> results = new
ArrayList<IAsyncResult>(versions.size());
+
+ for (int i = 0; i < versions.size(); i++)
+ {
+ ColumnFamily diffCf = ColumnFamily.diff(versions.get(i),
resolved);
+ if (diffCf == null) // no repair needs to happen
+ continue;
+
+ // create and send the row mutation message based on the diff
+ RowMutation rowMutation = new RowMutation(table, key.key);
+ rowMutation.add(diffCf);
+ MessageOut repairMessage;
+ // use a separate verb here because we don't want these to be get
the white glove hint-
+ // on-timeout behavior that a "real" mutation gets
+ repairMessage =
rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
+ results.add(MessagingService.instance().sendRR(repairMessage,
endpoints.get(i)));
+ }
+
+ return results;
+ }
+
+ static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
+ {
+ assert Iterables.size(versions) > 0;
+
+ ColumnFamily resolved = null;
+ for (ColumnFamily cf : versions)
+ {
+ if (cf == null)
+ continue;
+
+ if (resolved == null)
+ resolved = cf.cloneMeShallow();
+ else
+ resolved.delete(cf);
+ }
+ if (resolved == null)
+ return null;
+
+ // mimic the collectCollatedColumn + removeDeleted path that
getColumnFamily takes.
+ // this will handle removing columns and subcolumns that are
supressed by a row or
+ // supercolumn tombstone.
- QueryFilter filter = new QueryFilter(null, new
QueryPath(resolved.metadata().cfName), new IdentityQueryFilter());
- List<CloseableIterator<IColumn>> iters = new
ArrayList<CloseableIterator<IColumn>>();
++ QueryFilter filter = new QueryFilter(null,
resolved.metadata().cfName, new IdentityQueryFilter());
++ List<CloseableIterator<Column>> iters = new
ArrayList<CloseableIterator<Column>>();
+ for (ColumnFamily version : versions)
+ {
+ if (version == null)
+ continue;
+ iters.add(FBUtilities.closeableIterator(version.iterator()));
+ }
+ filter.collateColumns(resolved, iters, Integer.MIN_VALUE);
+ return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
+ }
+
+ public Row getData() throws IOException
+ {
+ return replies.iterator().next().payload.row();
+ }
+
+ public boolean isDataPresent()
+ {
+ return !replies.isEmpty();
+ }
+
+ public int getMaxLiveCount()
+ {
+ return maxLiveCount;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/test/unit/org/apache/cassandra/service/RowResolverTest.java
----------------------------------------------------------------------