Author: jbellis
Date: Thu Dec 22 21:34:24 2011
New Revision: 1222467
URL: http://svn.apache.org/viewvc?rev=1222467&view=rev
Log:
attempt hint delivery every ten minutes; hint handoff throttle delay default
changed to 1ms, from 50
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3554
Modified:
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/conf/cassandra.yaml
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Dec 22 21:34:24 2011
@@ -1,4 +1,7 @@
1.0.7
+ * attempt hint delivery every ten minutes, or when failure detector
+ notifies us that a node is back up, whichever comes first. hint
+ handoff throttle delay default changed to 1ms, from 50 (CASSANDRA-3554)
* add nodetool setstreamthroughput (CASSANDRA-3571)
* fix assertion when dropping a columnfamily with no sstables (CASSANDRA-3614)
* more efficient allocation of small bloom filters (CASSANDRA-3618)
Modified: cassandra/branches/cassandra-1.0/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/conf/cassandra.yaml?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-1.0/conf/cassandra.yaml Thu Dec 22 21:34:24
2011
@@ -26,8 +26,8 @@ hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
# generated. After it has been dead this long, hints will be dropped.
max_hint_window_in_ms: 3600000 # one hour
-# Sleep this long after delivering each row or row fragment
-hinted_handoff_throttle_delay_in_ms: 50
+# Sleep this long after delivering each hint
+hinted_handoff_throttle_delay_in_ms: 1
# authentication backend, implementing IAuthenticator; used to identify users
authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Dec 22 21:34:24 2011
@@ -1313,7 +1313,6 @@ public class ColumnFamilyStore implement
* @return true if we found all keys we were looking for, otherwise false
*/
public List<Row> getRangeSlice(ByteBuffer superColumn, final
AbstractBounds range, int maxResults, IFilter columnFilter)
- throws ExecutionException, InterruptedException
{
assert range instanceof Bounds
|| (!((Range)range).isWrapAround() ||
range.right.equals(StorageService.getPartitioner().getMinimumToken()))
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Thu Dec 22 21:34:24 2011
@@ -20,23 +20,26 @@ package org.apache.cassandra.db;
import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.collect.ImmutableSortedSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.IPartitioner;
@@ -61,7 +64,7 @@ import org.cliffc.high_scale_lib.NonBloc
* (We have to use String keys for compatibility with OPP.)
* SuperColumns in these rows are the mutations to replay, with uuid names:
*
- * <dest ip>: { // key
+ * <dest token>: { // key
* <uuid>: { // supercolumn
* mutation: <mutation> // subcolumn
* version: <mutation serialization version>
@@ -96,7 +99,7 @@ public class HintedHandOffManager implem
private final ExecutorService executor_ = new
JMXEnabledThreadPoolExecutor("HintedHandoff", Thread.MIN_PRIORITY);
- public HintedHandOffManager()
+ public void start()
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
@@ -107,25 +110,23 @@ public class HintedHandOffManager implem
{
throw new RuntimeException(e);
}
- }
- public void registerMBean()
- {
logger_.debug("Created HHOM instance, registered MBean.");
+
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ scheduleAllDeliveries();
+ }
+ };
+ StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10,
TimeUnit.MINUTES);
}
- private static boolean sendMutation(InetAddress endpoint, RowMutation
mutation) throws IOException
+ private static void sendMutation(InetAddress endpoint, RowMutation
mutation) throws TimeoutException
{
IWriteResponseHandler responseHandler =
WriteResponseHandler.create(endpoint);
MessagingService.instance().sendRR(mutation, endpoint,
responseHandler);
-
- try
- {
- responseHandler.get();
- }
- catch (TimeoutException e)
- {
- return false;
- }
+ responseHandler.get();
try
{
@@ -135,8 +136,6 @@ public class HintedHandOffManager implem
{
throw new AssertionError(e);
}
-
- return true;
}
private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer hintId,
long timestamp) throws IOException
@@ -226,7 +225,7 @@ public class HintedHandOffManager implem
logger_.debug("schema for {} matches local schema", endpoint);
return waited;
}
-
+
private void deliverHintsToEndpoint(InetAddress endpoint) throws
IOException, DigestMismatchException, InvalidRequestException,
TimeoutException, InterruptedException
{
ColumnFamilyStore hintStore =
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
@@ -275,12 +274,12 @@ public class HintedHandOffManager implem
while (true)
{
QueryFilter filter = QueryFilter.getSliceFilter(epkey, new
QueryPath(HINTS_CF), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false,
PAGE_SIZE);
- ColumnFamily hintColumnFamily =
ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
Integer.MAX_VALUE);
- if (pagingFinished(hintColumnFamily, startColumn))
+ ColumnFamily hintsPage =
ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
Integer.MAX_VALUE);
+ if (pagingFinished(hintsPage, startColumn))
break;
page:
- for (IColumn hint : hintColumnFamily.getSortedColumns())
+ for (IColumn hint : hintsPage.getSortedColumns())
{
startColumn = hint.name();
for (IColumn subColumn : hint.getSubColumns())
@@ -305,14 +304,15 @@ public class HintedHandOffManager implem
DataInputStream in = new
DataInputStream(ByteBufferUtil.inputStream(mutationColumn.value()));
RowMutation rm = RowMutation.serializer().deserialize(in,
ByteBufferUtil.toInt(versionColumn.value()));
- if (sendMutation(endpoint, rm))
+ try
{
+ sendMutation(endpoint, rm);
deleteHint(tokenBytes, hint.name(), hint.maxTimestamp());
rowsReplayed++;
}
- else
+ catch (TimeoutException e)
{
- logger_.info("Could not complete hinted handoff to " +
endpoint);
+ logger_.info(String.format("Timed out replaying hints to
%s; aborting further deliveries", endpoint));
break delivery;
}
}
@@ -335,12 +335,37 @@ public class HintedHandOffManager implem
rowsReplayed, endpoint));
}
+ /**
+ * Attempt delivery to any node for which we have hints. Necessary since
we can generate hints even for
+ * nodes which are never officially down/failed.
+ */
+ private void scheduleAllDeliveries()
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Started scheduleAllDeliveries");
+
+ ColumnFamilyStore hintStore =
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+ IPartitioner p = StorageService.getPartitioner();
+ Range range = new Range(p.getMinimumToken(), p.getMinimumToken(), p);
+ IFilter filter = new
NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
+ List<Row> rows = hintStore.getRangeSlice(null, range,
Integer.MAX_VALUE, filter);
+ for (Row row : rows)
+ {
+ Token<?> token =
StorageService.getPartitioner().getTokenFactory().fromByteArray(row.key.key);
+ InetAddress target =
StorageService.instance.getTokenMetadata().getEndpoint(token);
+ scheduleHintDelivery(target);
+ }
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Finished scheduleAllDeliveries");
+ }
+
/*
* This method is used to deliver hints to a particular endpoint.
* When we learn that some endpoint is back up we deliver the data
* to him via an event driven mechanism.
*/
- public void deliverHints(final InetAddress to)
+ public void scheduleHintDelivery(final InetAddress to)
{
logger_.debug("deliverHints to {}", to);
if (!queuedDeliveries.add(to))
@@ -356,9 +381,9 @@ public class HintedHandOffManager implem
executor_.execute(r);
}
- public void deliverHints(String to) throws UnknownHostException
+ public void scheduleHintDelivery(String to) throws UnknownHostException
{
- deliverHints(InetAddress.getByName(to));
+ scheduleHintDelivery(InetAddress.getByName(to));
}
public List<String> listEndpointsPendingHints()
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
Thu Dec 22 21:34:24 2011
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
+import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
@@ -25,9 +26,9 @@ public interface HintedHandOffManagerMBe
{
/**
* Nuke all hints from this node to `ep`.
- * @param epaddr String rep. of endpoint address to delete hints for,
either ip address ("127.0.0.1") or hostname
+ * @param host String rep. of endpoint address to delete hints for, either
ip address ("127.0.0.1") or hostname
*/
- public void deleteHintsForEndpoint(final String epaddr);
+ public void deleteHintsForEndpoint(final String host);
/**
* List all the endpoints that this node has hints for.
@@ -42,5 +43,8 @@ public interface HintedHandOffManagerMBe
* @return map of endpoint -> hint count
*/
public Map<String, Integer> countPendingHints();
+
+ /** force hint delivery to an endpoint **/
+ public void scheduleHintDelivery(String host) throws UnknownHostException;
}
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
Thu Dec 22 21:34:24 2011
@@ -112,7 +112,7 @@ public class RowMutation implements IMut
* The format is the following:
*
* HintsColumnFamily: { // cf
- * <dest ip>: { // key
+ * <dest token>: { // key
* <uuid>: { // super-column
* table: <table> // columns
* key: <key>
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
Thu Dec 22 21:34:24 2011
@@ -804,21 +804,10 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("local range slice");
ColumnFamilyStore cfs =
Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
- try
- {
- rows.addAll(cfs.getRangeSlice(command.super_column,
- range,
- command.max_keys,
-
QueryFilter.getFilter(command.predicate, cfs.getComparator())));
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e.getCause());
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ rows.addAll(cfs.getRangeSlice(command.super_column,
+ range,
+ command.max_keys,
+
QueryFilter.getFilter(command.predicate, cfs.getComparator())));
}
else
{
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
Thu Dec 22 21:34:24 2011
@@ -510,7 +510,7 @@ public class StorageService implements I
MigrationManager.passiveAnnounce(Schema.instance.getVersion());
Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION,
valueFactory.releaseVersion());
- HintedHandOffManager.instance.registerMBean();
+ HintedHandOffManager.instance.start();
if (DatabaseDescriptor.isAutoBootstrap()
&&
DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())
@@ -1465,7 +1465,7 @@ public class StorageService implements I
public void onAlive(InetAddress endpoint, EndpointState state)
{
if (!isClientMode && getTokenMetadata().isMember(endpoint))
- deliverHints(endpoint);
+ HintedHandOffManager.instance.scheduleHintDelivery(endpoint);
}
public void onRemove(InetAddress endpoint)
@@ -1516,18 +1516,9 @@ public class StorageService implements I
return map;
}
- /**
- * Deliver hints to the specified node when it has crashed
- * and come back up/ marked as alive after a network partition
- */
- public final void deliverHints(InetAddress endpoint)
- {
- HintedHandOffManager.instance.deliverHints(endpoint);
- }
-
public final void deliverHints(String host) throws UnknownHostException
{
- HintedHandOffManager.instance.deliverHints(host);
+ HintedHandOffManager.instance.scheduleHintDelivery(host);
}
public Token getLocalToken()
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Thu Dec 22 21:34:24 2011
@@ -300,9 +300,6 @@ public interface StorageServiceMBean
*/
public void truncate(String keyspace, String columnFamily) throws
UnavailableException, TimeoutException, IOException;
- /** force hint delivery to an endpoint **/
- public void deliverHints(String host) throws UnknownHostException;
-
/** save row and key caches */
public void saveCaches() throws ExecutionException, InterruptedException;