Author: jbellis
Date: Wed Jan 5 07:00:33 2011
New Revision: 1055320
URL: http://svn.apache.org/viewvc?rev=1055320&view=rev
Log:
rename [Datacenter]QuorumResponseHandler -> [Datacenter]ReadCallback
patch by jbellis
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
Removed:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1055320&r1=1055319&r2=1055320&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Wed Jan 5 07:00:33 2011
@@ -223,15 +223,6 @@ public abstract class AbstractReplicatio
return getAddressRanges(temp).get(pendingAddress);
}
- public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver
responseResolver, ConsistencyLevel consistencyLevel)
- {
- if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) ||
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
- {
- return new DatacenterQuorumResponseHandler(responseResolver,
consistencyLevel, table);
- }
- return new QuorumResponseHandler(responseResolver, consistencyLevel,
table);
- }
-
public void invalidateCachedTokenEndpointValues()
{
clearEndpointCache();
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1055320&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
(added)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
Wed Jan 5 07:00:33 2011
@@ -0,0 +1,88 @@
+package org.apache.cassandra.service;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Datacenter Quorum response handler blocks for a quorum of responses from
the local DC
+ */
+public class DatacenterReadCallback<T> extends ReadCallback<T>
+{
+ private static final IEndpointSnitch snitch =
DatabaseDescriptor.getEndpointSnitch();
+ private static final String localdc =
snitch.getDatacenter(FBUtilities.getLocalAddress());
+ private AtomicInteger localResponses;
+
+ public DatacenterReadCallback(IResponseResolver<T> resolver,
ConsistencyLevel consistencyLevel, String table)
+ {
+ super(resolver, consistencyLevel, table);
+ localResponses = new AtomicInteger(blockfor);
+ }
+
+ @Override
+ public void response(Message message)
+ {
+ resolver.preprocess(message);
+
+ int n;
+ n = localdc.equals(snitch.getDatacenter(message.getFrom()))
+ ? localResponses.decrementAndGet()
+ : localResponses.get();
+
+ if (n == 0 && resolver.isDataPresent())
+ {
+ condition.signal();
+ }
+ }
+
+ @Override
+ public int determineBlockFor(ConsistencyLevel consistency_level, String
table)
+ {
+ NetworkTopologyStrategy stategy = (NetworkTopologyStrategy)
Table.open(table).getReplicationStrategy();
+ return (stategy.getReplicationFactor(localdc) / 2) + 1;
+ }
+
+ @Override
+ public void assureSufficientLiveNodes(Collection<InetAddress> endpoints)
throws UnavailableException
+ {
+ int localEndpoints = 0;
+ for (InetAddress endpoint : endpoints)
+ {
+ if (localdc.equals(snitch.getDatacenter(endpoint)))
+ localEndpoints++;
+ }
+
+ if(localEndpoints < blockfor)
+ throw new UnavailableException();
+ }
+}
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1055320&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
(added)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
Wed Jan 5 07:00:33 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public class ReadCallback<T> implements IAsyncCallback
+{
+ protected static final Logger logger = LoggerFactory.getLogger(
ReadCallback.class );
+
+ public final IResponseResolver<T> resolver;
+ protected final SimpleCondition condition = new SimpleCondition();
+ private final long startTime;
+ protected final int blockfor;
+
+ /**
+ * Constructor when response count has to be calculated and blocked for.
+ */
+ public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel
consistencyLevel, String table)
+ {
+ this.blockfor = determineBlockFor(consistencyLevel, table);
+ this.resolver = resolver;
+ this.startTime = System.currentTimeMillis();
+
+ logger.debug("ReadCallback blocking for {} responses", blockfor);
+ }
+
+ public T get() throws TimeoutException, DigestMismatchException,
IOException
+ {
+ long timeout = DatabaseDescriptor.getRpcTimeout() -
(System.currentTimeMillis() - startTime);
+ boolean success;
+ try
+ {
+ success = condition.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
+ }
+
+ if (!success)
+ {
+ StringBuilder sb = new StringBuilder("");
+ for (Message message : resolver.getMessages())
+ sb.append(message.getFrom()).append(", ");
+ throw new TimeoutException("Operation timed out - received only "
+ resolver.getMessageCount() + " responses from " + sb.toString() + " .");
+ }
+
+ return blockfor == 1 ? resolver.getData() : resolver.resolve();
+ }
+
+ public void close()
+ {
+ for (Message response : resolver.getMessages())
+ {
+ MessagingService.removeRegisteredCallback(response.getMessageId());
+ }
+ }
+
+ public void response(Message message)
+ {
+ resolver.preprocess(message);
+ if (resolver.getMessageCount() < blockfor)
+ return;
+ if (resolver.isDataPresent())
+ condition.signal();
+ }
+
+ public int determineBlockFor(ConsistencyLevel consistencyLevel, String
table)
+ {
+ switch (consistencyLevel)
+ {
+ case ONE:
+ case ANY:
+ return 1;
+ case QUORUM:
+ return
(Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ case ALL:
+ return
Table.open(table).getReplicationStrategy().getReplicationFactor();
+ default:
+ throw new UnsupportedOperationException("invalid consistency
level: " + consistencyLevel);
+ }
+ }
+
+ public void assureSufficientLiveNodes(Collection<InetAddress> endpoints)
throws UnavailableException
+ {
+ if (endpoints.size() < blockfor)
+ throw new UnavailableException();
+ }
+}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055320&r1=1055319&r2=1055320&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Wed Jan 5 07:00:33 2011
@@ -328,7 +328,7 @@ public class StorageProxy implements Sto
*/
private static List<Row> fetchRows(List<ReadCommand> commands,
ConsistencyLevel consistency_level) throws IOException, UnavailableException,
TimeoutException
{
- List<QuorumResponseHandler<Row>> quorumResponseHandlers = new
ArrayList<QuorumResponseHandler<Row>>();
+ List<ReadCallback<Row>> readCallbacks = new
ArrayList<ReadCallback<Row>>();
List<List<InetAddress>> commandEndpoints = new
ArrayList<List<InetAddress>>();
List<Row> rows = new ArrayList<Row>();
Set<ReadCommand> repairs = new HashSet<ReadCommand>();
@@ -347,7 +347,7 @@ public class StorageProxy implements Sto
AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key);
- QuorumResponseHandler<Row> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
+ ReadCallback<Row> handler = getReadCallback(resolver,
command.table, consistency_level);
handler.assureSufficientLiveNodes(endpoints);
int targets;
@@ -374,7 +374,7 @@ public class StorageProxy implements Sto
logger.debug("reading " + (m == message ? "data" :
"digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
}
MessagingService.instance().sendRR(messages, endpoints, handler);
- quorumResponseHandlers.add(handler);
+ readCallbacks.add(handler);
commandEndpoints.add(endpoints);
}
@@ -382,22 +382,22 @@ public class StorageProxy implements Sto
List<RepairCallback<Row>> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
- QuorumResponseHandler<Row> quorumResponseHandler =
quorumResponseHandlers.get(i);
+ ReadCallback<Row> readCallback = readCallbacks.get(i);
Row row;
ReadCommand command = commands.get(i);
List<InetAddress> endpoints = commandEndpoints.get(i);
try
{
long startTime2 = System.currentTimeMillis();
- row = quorumResponseHandler.get();
+ row = readCallback.get();
if (row != null)
rows.add(row);
if (logger.isDebugEnabled())
- logger.debug("quorumResponseHandler: " +
(System.currentTimeMillis() - startTime2) + " ms.");
+ logger.debug("Read: " + (System.currentTimeMillis() -
startTime2) + " ms.");
if (repairs.contains(command))
- repairExecutor.schedule(new
RepairRunner(quorumResponseHandler.resolver, command, endpoints),
DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ repairExecutor.schedule(new
RepairRunner(readCallback.resolver, command, endpoints),
DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
catch (DigestMismatchException ex)
{
@@ -431,6 +431,15 @@ public class StorageProxy implements Sto
return rows;
}
+ static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver,
String table, ConsistencyLevel consistencyLevel)
+ {
+ if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) ||
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
+ {
+ return new DatacenterReadCallback(resolver, consistencyLevel,
table);
+ }
+ return new ReadCallback(resolver, consistencyLevel, table);
+ }
+
// TODO repair resolver shouldn't take consistencylevel (it should repair
exactly as many as it receives replies for)
private static RepairCallback<Row> repair(ReadCommand command,
List<InetAddress> endpoints)
throws IOException
@@ -492,7 +501,7 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency
level
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
AbstractReplicationStrategy rs =
Table.open(command.keyspace).getReplicationStrategy();
- QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
+ ReadCallback<List<Row>> handler =
getReadCallback(resolver, command.keyspace, consistency_level);
// TODO bail early if live endpoints can't satisfy
requested consistency level
for (InetAddress endpoint : liveEndpoints)
{
@@ -741,7 +750,7 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(keyspace, liveEndpoints);
AbstractReplicationStrategy rs =
Table.open(keyspace).getReplicationStrategy();
- QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
+ ReadCallback<List<Row>> handler = getReadCallback(resolver,
keyspace, consistency_level);
// bail early if live endpoints can't satisfy requested
consistency level
if(handler.blockfor > liveEndpoints.size())
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1055320&r1=1055319&r2=1055320&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Wed Jan 5 07:00:33 2011
@@ -96,7 +96,7 @@ public class ConsistencyLevelTest extend
IWriteResponseHandler writeHandler =
strategy.getWriteResponseHandler(hosts, hintedNodes, c);
- QuorumResponseHandler<Row> readHandler =
strategy.getQuorumResponseHandler(new ReadResponseResolver(table,
ByteBufferUtil.bytes("foo")), c);
+ ReadCallback<Row> readHandler =
StorageProxy.getReadCallback(new ReadResponseResolver(table,
ByteBufferUtil.bytes("foo")), table, c);
boolean isWriteUnavailable = false;
boolean isReadUnavailable = false;