Author: slebresne
Date: Mon Nov 7 13:26:00 2011
New Revision: 1198725
URL: http://svn.apache.org/viewvc?rev=1198725&view=rev
Log:
patch by slebresne; reviewed by yukim for CASSANDRA-3178
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Nov 7 13:26:00 2011
@@ -38,6 +38,7 @@
* acquire compactionlock during truncate (CASSANDRA-3399)
* fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)
* (Hadoop) Fix empty row filtering (CASSANDRA-3450)
+ * Make counter shard merging thread safe (CASSANDRA-3178)
0.8.7
* Kill server on wrapped OOME such as from FileChannel.map (CASSANDRA-3201)
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
Mon Nov 7 13:26:00 2011
@@ -150,6 +150,8 @@ public final class CFMetaData
private int memtableFlushAfterMins; // default 60
private int memtableThroughputInMb; // default based on heap
size
private double memtableOperationsInMillions; // default based on
throughput
+ // mergeShardsChance is now obsolete, but left here so as to not break
+ // thrift compatibility
private double mergeShardsChance; // default 0.1, chance
[0.0, 1.0] of merging old shards during replication
private IRowCacheProvider rowCacheProvider;
private ByteBuffer keyAlias; // default NULL
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Mon Nov 7 13:26:00 2011
@@ -2308,4 +2308,16 @@ public class ColumnFamilyStore implement
return reader;
}
+
+ /**
+ * Returns the creation time of the oldest memtable not fully flushed yet.
+ */
+ public long oldestUnflushedMemtable()
+ {
+ DataTracker.View view = data.getView();
+ long oldest = view.memtable.creationTime();
+ for (Memtable memtable : view.memtablesPendingFlush)
+ oldest = Math.min(oldest, memtable.creationTime());
+ return oldest;
+ }
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java
Mon Nov 7 13:26:00 2011
@@ -22,17 +22,24 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.concurrent.TimeoutException;
import java.util.Arrays;
import java.util.Map;
+import com.google.common.collect.Multimap;
import org.apache.log4j.Logger;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.context.IContext.ContextRelationship;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.service.IWriteResponseHandler;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NodeId;
@@ -223,9 +230,9 @@ public class CounterColumn extends Colum
return contextManager.hasNodeId(value(), id);
}
- public CounterColumn computeOldShardMerger()
+ private CounterColumn computeOldShardMerger(int mergeBefore)
{
- ByteBuffer bb = contextManager.computeOldShardMerger(value(),
NodeId.getOldLocalNodeIds());
+ ByteBuffer bb = contextManager.computeOldShardMerger(value(),
NodeId.getOldLocalNodeIds(), mergeBefore);
if (bb == null)
return null;
else
@@ -243,8 +250,19 @@ public class CounterColumn extends Colum
}
}
- public static void removeOldShards(ColumnFamily cf, int gcBefore)
+ /**
+ * There is two phase to the removal of old shards.
+ * First phase: we merge the old shard value to the current shard and
+ * 'nulify' the old one. We then send the counter context with the old
+ * shard nulified to all other replica.
+ * Second phase: once an old shard has been nulified for longer than
+ * gc_grace (to be sure all other replica had been aware of the merge), we
+ * simply remove that old shard from the context (it's value is 0).
+ * This method does both phases.
+ */
+ public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily
cf, int gcBefore, int mergeBefore)
{
+ ColumnFamily remoteMerger = null;
if (!cf.isSuper())
{
for (Map.Entry<ByteBuffer, IColumn> entry :
cf.getColumnsMap().entrySet())
@@ -253,8 +271,18 @@ public class CounterColumn extends Colum
IColumn c = entry.getValue();
if (!(c instanceof CounterColumn))
continue;
- CounterColumn cleaned = ((CounterColumn)
c).removeOldShards(gcBefore);
- if (cleaned != c)
+ CounterColumn cc = (CounterColumn) c;
+ CounterColumn shardMerger =
cc.computeOldShardMerger(mergeBefore);
+ CounterColumn merged = cc;
+ if (shardMerger != null)
+ {
+ merged = (CounterColumn) cc.reconcile(shardMerger);
+ if (remoteMerger == null)
+ remoteMerger = cf.cloneMeShallow();
+ remoteMerger.addColumn(merged);
+ }
+ CounterColumn cleaned = merged.removeOldShards(gcBefore);
+ if (cleaned != cc)
{
cf.remove(cname);
cf.addColumn(cleaned);
@@ -270,7 +298,17 @@ public class CounterColumn extends Colum
{
if (!(subColumn instanceof CounterColumn))
continue;
- CounterColumn cleaned = ((CounterColumn)
subColumn).removeOldShards(gcBefore);
+ CounterColumn cc = (CounterColumn) subColumn;
+ CounterColumn shardMerger =
cc.computeOldShardMerger(mergeBefore);
+ CounterColumn merged = cc;
+ if (shardMerger != null)
+ {
+ merged = (CounterColumn) cc.reconcile(shardMerger);
+ if (remoteMerger == null)
+ remoteMerger = cf.cloneMeShallow();
+ remoteMerger.addColumn(c.name(), merged);
+ }
+ CounterColumn cleaned = merged.removeOldShards(gcBefore);
if (cleaned != subColumn)
{
c.remove(subColumn.name());
@@ -279,6 +317,40 @@ public class CounterColumn extends Colum
}
}
}
+
+ if (remoteMerger != null)
+ {
+ try
+ {
+ sendToOtherReplica(key, remoteMerger);
+ }
+ catch (Exception e)
+ {
+ logger.error("Error while sending shard merger mutation to
remote endpoints", e);
+ }
+ }
}
+ private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf)
throws UnavailableException, TimeoutException, IOException
+ {
+ RowMutation rm = new RowMutation(cf.metadata().ksName, key.key);
+ rm.add(cf);
+
+ final InetAddress local = FBUtilities.getLocalAddress();
+ String localDataCenter =
DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
+
+ StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter,
new StorageProxy.WritePerformer()
+ {
+ public void apply(IMutation mutation, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String
localDataCenter, ConsistencyLevel consistency_level) throws IOException
+ {
+ // We should only send to the remote replica, not the local one
+ hintedEndpoints.remove(local, local);
+ // Fake local response to be a good lad but we won't wait on
the responseHandler
+ responseHandler.response(null);
+ StorageProxy.sendToHintedEndpoints((RowMutation) mutation,
hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+ }
+ });
+
+ // we don't wait for answers
+ }
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
Mon Nov 7 13:26:00 2011
@@ -106,7 +106,6 @@ public class CounterMutation implements
if (row == null || row.cf == null)
continue;
- row = mergeOldShards(readCommand.table, row);
ColumnFamily cf = row.cf;
if (cf.isSuper())
cf.retainAll(rowMutation.getColumnFamily(cf.metadata().cfId));
@@ -121,73 +120,6 @@ public class CounterMutation implements
commands.add(new SliceByNamesReadCommand(table, key, queryPath,
columnFamily.getColumnNames()));
}
- private Row mergeOldShards(String table, Row row) throws IOException
- {
- ColumnFamily cf = row.cf;
- // random check for merging to allow lessening the performance impact
- if (cf.metadata().getMergeShardsChance() >
FBUtilities.threadLocalRandom().nextDouble())
- {
- ColumnFamily merger = computeShardMerger(cf);
- if (merger != null)
- {
- RowMutation localMutation = new RowMutation(table,
row.key.key);
- localMutation.add(merger);
- localMutation.apply();
-
- cf.addAll(merger);
- }
- }
- return row;
- }
-
- private ColumnFamily computeShardMerger(ColumnFamily cf)
- {
- ColumnFamily merger = null;
-
- // CF type: regular
- if (!cf.isSuper())
- {
- for (IColumn column : cf.getSortedColumns())
- {
- if (!(column instanceof CounterColumn))
- continue;
- IColumn c = ((CounterColumn)column).computeOldShardMerger();
- if (c != null)
- {
- if (merger == null)
- merger = cf.cloneMeShallow();
- merger.addColumn(c);
- }
- }
- }
- else // CF type: super
- {
- for (IColumn superColumn : cf.getSortedColumns())
- {
- IColumn mergerSuper = null;
- for (IColumn column : superColumn.getSubColumns())
- {
- if (!(column instanceof CounterColumn))
- continue;
- IColumn c =
((CounterColumn)column).computeOldShardMerger();
- if (c != null)
- {
- if (mergerSuper == null)
- mergerSuper =
((SuperColumn)superColumn).cloneMeShallow();
- mergerSuper.addColumn(c);
- }
- }
- if (mergerSuper != null)
- {
- if (merger == null)
- merger = cf.cloneMeShallow();
- merger.addColumn(mergerSuper);
- }
- }
- }
- return merger;
- }
-
public Message makeMutationMessage(int version) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java
Mon Nov 7 13:26:00 2011
@@ -399,4 +399,9 @@ public class Memtable implements Compara
{
return System.currentTimeMillis() > creationTime +
cfs.getMemtableFlushAfterMins() * 60 * 1000L;
}
+
+ public long creationTime()
+ {
+ return creationTime;
+ }
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
Mon Nov 7 13:26:00 2011
@@ -46,6 +46,7 @@ public class CompactionController
public final boolean isMajor;
public final int gcBefore;
+ public final int mergeShardBefore;
private int throttleResolution;
public CompactionController(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
@@ -54,6 +55,11 @@ public class CompactionController
this.cfs = cfs;
this.sstables = new HashSet<SSTableReader>(sstables);
this.gcBefore = gcBefore;
+ // If we merge an old NodeId id, we must make sure that no further
increment for that id are in an active memtable.
+ // For that, we must make sure that this id was renewed before the
creation of the oldest unflushed memtable. We
+ // add 5 minutes to be sure we're on the safe side in terms of thread
safety (though we should be fine in our
+ // current 'stop all write during memtable switch' situation).
+ this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 *
3600) / 1000);
this.forceDeserialize = forceDeserialize;
isMajor = cfs.isCompleteSSTables(this.sstables);
// how many rows we expect to compact in 100ms
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
Mon Nov 7 13:26:00 2011
@@ -202,7 +202,7 @@ public class LazilyCompactedRow extends
protected IColumn getReduced()
{
- ColumnFamily purged =
PrecompactedRow.removeDeletedAndOldShards(shouldPurge, controller, container);
+ ColumnFamily purged =
PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller,
container);
if (purged == null || !purged.iterator().hasNext())
{
container.clear();
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
Mon Nov 7 13:26:00 2011
@@ -57,14 +57,14 @@ public class PrecompactedRow extends Abs
public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key,
CompactionController controller, ColumnFamily cf)
{
- return removeDeletedAndOldShards(controller.shouldPurge(key),
controller, cf);
+ return removeDeletedAndOldShards(key, controller.shouldPurge(key),
controller, cf);
}
- public static ColumnFamily removeDeletedAndOldShards(boolean shouldPurge,
CompactionController controller, ColumnFamily cf)
+ public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key,
boolean shouldPurge, CompactionController controller, ColumnFamily cf)
{
ColumnFamily compacted = shouldPurge ?
ColumnFamilyStore.removeDeleted(cf, controller.gcBefore) : cf;
if (shouldPurge && compacted != null &&
compacted.metadata().getDefaultValidator().isCommutative())
- CounterColumn.removeOldShards(compacted, controller.gcBefore);
+ CounterColumn.mergeAndRemoveOldShards(key, compacted,
controller.gcBefore, controller.mergeShardBefore);
return compacted;
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
Mon Nov 7 13:26:00 2011
@@ -69,10 +69,6 @@ public class CounterContext implements I
private static final Logger logger =
Logger.getLogger(CounterContext.class);
- // Time in ms since a node id has been renewed before we consider using it
- // during a merge
- private static final long MIN_MERGE_DELAY = 5 * 60 * 1000; // should be
aplenty
-
// lazy-load singleton
private static class LazyHolder
{
@@ -502,84 +498,123 @@ public class CounterContext implements I
/**
* Compute a new context such that if applied to context yields the same
- * total but with the older local node id merged into the second to older
one
- * (excluding current local node id) if need be.
+ * total but with old local node ids nulified and there content merged to
+ * the current localNodeId.
*/
- public ByteBuffer computeOldShardMerger(ByteBuffer context,
List<NodeId.NodeIdRecord> oldIds)
+ public ByteBuffer computeOldShardMerger(ByteBuffer context,
List<NodeId.NodeIdRecord> oldIds, long mergeBefore)
{
long now = System.currentTimeMillis();
int hlength = headerLength(context);
-
- // Don't bother if we know we can't find what we are looking for
- if (oldIds.size() < 2
- || now - oldIds.get(0).timestamp < MIN_MERGE_DELAY
- || now - oldIds.get(1).timestamp < MIN_MERGE_DELAY
- || context.remaining() - hlength < 2 * STEP_LENGTH)
- return null;
+ NodeId localId = NodeId.getLocalId();
Iterator<NodeId.NodeIdRecord> recordIterator = oldIds.iterator();
- NodeId.NodeIdRecord currRecord = recordIterator.next();
+ NodeId.NodeIdRecord currRecord = recordIterator.hasNext() ?
recordIterator.next() : null;
ContextState state = new ContextState(context, hlength);
ContextState foundState = null;
+ List<NodeId> toMerge = new ArrayList<NodeId>();
+ long mergeTotal = 0;
while (state.hasRemaining() && currRecord != null)
{
- if (now - currRecord.timestamp < MIN_MERGE_DELAY)
- return context;
+ assert !currRecord.id.equals(localId);
- assert !currRecord.id.equals(NodeId.getLocalId());
+ NodeId nodeId = state.getNodeId();
+ int c = nodeId.compareTo(currRecord.id);
- int c = state.getNodeId().compareTo(currRecord.id);
- if (c == 0)
+ if (c > 0)
+ {
+ currRecord = recordIterator.hasNext() ? recordIterator.next()
: null;
+ continue;
+ }
+
+ if (state.isDelta())
{
- if (foundState == null)
+ if (state.getClock() < 0)
{
- // We found a canditate for being merged
- if (state.getClock() < 0)
- return null;
-
- foundState = state.duplicate();
- currRecord = recordIterator.hasNext() ?
recordIterator.next() : null;
- state.moveToNext();
+ // Already merged shard, waiting to be collected
+
+ if (nodeId.equals(localId))
+ // we should not get there, but we have been creating
problematic context prior to #2968
+ throw new RuntimeException("Current nodeId with a
negative clock (likely due to #2968). You need to restart this node with
-Dcassandra.renew_counter_id=true to fix.");
+
+ if (state.getCount() != 0)
+ {
+ // This should not happen, but previous bugs have
generated this (#2968 in particular) so fixing it.
+ logger.error(String.format("Invalid counter context
(clock is %d and count is %d for NodeId %s), will fix", state.getCount(),
state.getCount(), nodeId.toString()));
+ toMerge.add(nodeId);
+ mergeTotal += state.getCount();
+ }
}
- else
+ else if (c == 0)
{
- assert !foundState.getNodeId().equals(state.getNodeId());
-
- // Found someone to merge it to
- int nbDelta = foundState.isDelta() ? 1 : 0;
- nbDelta += state.isDelta() ? 1 : 0;
- ContextState merger = ContextState.allocate(2, nbDelta);
-
- long fclock = foundState.getClock();
- long fcount = foundState.getCount();
- long clock = state.getClock();
- long count = state.getCount();
+ // Found an old id. However, merging an oldId that has
just been renewed isn't safe, so
+ // we check that it has been renewed before mergeBefore.
+ if (currRecord.timestamp < mergeBefore)
+ {
+ toMerge.add(nodeId);
+ mergeTotal += state.getCount();
+ }
+ }
+ }
- if (foundState.isDelta())
- merger.writeElement(foundState.getNodeId(), -now -
fclock, -fcount, true);
- else
- merger.writeElement(foundState.getNodeId(), -now, 0);
+ if (c == 0)
+ currRecord = recordIterator.hasNext() ? recordIterator.next()
: null;
- if (state.isDelta())
- merger.writeElement(state.getNodeId(), fclock + clock,
fcount, true);
- else
- merger.writeElement(state.getNodeId(), fclock + clock,
fcount + count);
+ state.moveToNext();
+ }
+ // Continuing the iteration so that we can repair invalid shards
+ while (state.hasRemaining())
+ {
+ NodeId nodeId = state.getNodeId();
+ if (state.isDelta() && state.getClock() < 0)
+ {
+ if (nodeId.equals(localId))
+ // we should not get there, but we have been creating
problematic context prior to #2968
+ throw new RuntimeException("Current nodeId with a negative
clock (likely due to #2968). You need to restart this node with
-Dcassandra.renew_counter_id=true to fix.");
- return merger.context;
+ if (state.getCount() != 0)
+ {
+ // This should not happen, but previous bugs have
generated this (#2968 in particular) so fixing it.
+ logger.error(String.format("Invalid counter context (clock
is %d and count is %d for NodeId %s), will fix", state.getClock(),
state.getCount(), nodeId.toString()));
+ toMerge.add(nodeId);
+ mergeTotal += state.getCount();
}
}
- else if (c < 0) // nodeid < record
+ state.moveToNext();
+ }
+
+ if (toMerge.isEmpty())
+ return null;
+
+ ContextState merger = ContextState.allocate(toMerge.size() + 1,
toMerge.size() + 1);
+ state.reset();
+ int i = 0;
+ int removedTotal = 0;
+ boolean localWritten = false;
+ while (state.hasRemaining())
+ {
+ NodeId nodeId = state.getNodeId();
+ if (nodeId.compareTo(localId) > 0)
{
- state.moveToNext();
+ merger.writeElement(localId, 1L, mergeTotal, true);
+ localWritten = true;
}
- else // c > 0, nodeid > record
+ else if (i < toMerge.size() && nodeId.compareTo(toMerge.get(i)) ==
0)
{
- currRecord = recordIterator.hasNext() ? recordIterator.next()
: null;
+ long count = state.getCount();
+ removedTotal += count;
+ merger.writeElement(nodeId, -now - state.getClock(), -count,
true);
+ ++i;
}
+ state.moveToNext();
}
- return null;
+ if (!localWritten)
+ merger.writeElement(localId, 1L, mergeTotal, true);
+
+ // sanity check
+ assert mergeTotal == removedTotal;
+ return merger.context;
}
/**
@@ -592,59 +627,61 @@ public class CounterContext implements I
{
int hlength = headerLength(context);
ContextState state = new ContextState(context, hlength);
- int removedBodySize = 0, removedHeaderSize = 0;
- boolean forceFixing = false;
+ int removedShards = 0;
while (state.hasRemaining())
{
long clock = state.getClock();
- if (clock < 0 && -((int)(clock / 1000)) < gcBefore &&
(state.getCount() == 0 || !state.isDelta()))
- {
- removedBodySize += STEP_LENGTH;
- if (state.isDelta())
- removedHeaderSize += HEADER_ELT_LENGTH;
- }
- else if (clock < 0 && state.getCount() != 0 && state.isDelta())
+ if (clock < 0)
{
- forceFixing = true;
+ // We should never have a count != 0 when clock < 0.
+ // We know that previous may have created those situation
though, so:
+ // * for delta shard: we throw an exception since
computeOldShardMerger should
+ // have corrected that situation
+ // * for non-delta shard: it is a much more crappier
situation because there is
+ // not much we can do since we are not responsible for
that shard. So we simply
+ // ignore the shard.
+ if (state.getCount() != 0)
+ {
+ if (state.isDelta())
+ {
+ throw new IllegalStateException("Counter shard with
negative clock but count != 0; context = " + toString(context));
+ }
+ else
+ {
+ logger.debug("Ignoring non-removable non-delta
corrupted shard in context " + toString(context));
+ state.moveToNext();
+ continue;
+ }
+ }
+
+ if (-((int)(clock / 1000)) < gcBefore)
+ removedShards++;
}
state.moveToNext();
}
- if (removedBodySize == 0 && !forceFixing)
+ if (removedShards == 0)
return context;
- int newSize = context.remaining() - removedHeaderSize -
removedBodySize;
+
+ int removedHeaderSize = removedShards * HEADER_ELT_LENGTH;
+ int newSize = context.remaining() - removedHeaderSize - (removedShards
* STEP_LENGTH);
int newHlength = hlength - removedHeaderSize;
ByteBuffer cleanedContext = ByteBuffer.allocate(newSize);
cleanedContext.putShort(cleanedContext.position(), (short)
((newHlength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH));
ContextState cleaned = new ContextState(cleanedContext, newHlength);
state.reset();
- long toAddBack = 0;
while (state.hasRemaining())
{
long clock = state.getClock();
- if (!(clock < 0 && -((int)(clock / 1000)) < gcBefore &&
(state.getCount() == 0 || !state.isDelta())))
+ if (!(clock < 0 && state.getCount() == 0))
{
- if (clock < 0 && state.getCount() != 0 && state.isDelta())
- {
- // we should not get there, but we have been creating
problematic context prior to #2968
- if (state.getNodeId().equals(NodeId.getLocalId()))
- throw new RuntimeException("Merged counter shard with
a count != 0 (likely due to #2968). You need to restart this node with
-Dcassandra.renew_counter_id=true to fix.");
-
- // we will "fix" it, but log a message
- logger.info("Collectable old shard with a count != 0. Will
fix.");
- cleaned.writeElement(state.getNodeId(), clock - 1L, 0,
true);
- toAddBack += state.getCount();
- }
- else
- {
- state.copyTo(cleaned);
- }
+ state.copyTo(cleaned);
}
state.moveToNext();
}
- return toAddBack == 0 ? cleanedContext : merge(cleanedContext,
create(toAddBack));
+ return cleanedContext;
}
/**
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Mon Nov 7 13:26:00 2011
@@ -227,7 +227,7 @@ public class StorageProxy implements Sto
return
ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key),
table, naturalEndpoints);
}
- private static void sendToHintedEndpoints(final RowMutation rm,
Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler
responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
+ public static void sendToHintedEndpoints(final RowMutation rm,
Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler
responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
throws IOException
{
// Multimap that holds onto all the messages and addresses meant for a
specific datacenter
@@ -1105,7 +1105,7 @@ public class StorageProxy implements Sto
return !Gossiper.instance.getUnreachableMembers().isEmpty();
}
- private interface WritePerformer
+ public interface WritePerformer
{
public void apply(IMutation mutation, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String
localDataCenter, ConsistencyLevel consistency_level) throws IOException;
}
Modified:
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
(original)
+++
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
Mon Nov 7 13:26:00 2011
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import org.junit.Test;
@@ -87,6 +88,7 @@ public class CounterMutationTest extends
public void testRemoveOldShardFixCorrupted() throws IOException
{
CounterContext ctx = CounterContext.instance();
+ int now = (int) (System.currentTimeMillis() / 1000);
// Check that corrupted context created prior to #2968 are fixed by
removeOldShards
NodeId id1 = NodeId.getLocalId();
@@ -96,19 +98,21 @@ public class CounterMutationTest extends
ContextState state = ContextState.allocate(3, 2);
state.writeElement(NodeId.fromInt(1), 1, 4, false);
state.writeElement(id1, 3, 2, true);
- state.writeElement(id2, -System.currentTimeMillis(), 5, true); //
corrupted!
+ state.writeElement(id2, -100, 5, true); // corrupted!
assert ctx.total(state.context) == 11;
try
{
- ctx.removeOldShards(state.context, Integer.MAX_VALUE);
+ ByteBuffer merger = ctx.computeOldShardMerger(state.context,
Collections.<NodeId.NodeIdRecord>emptyList(), 0);
+ ctx.removeOldShards(ctx.merge(state.context, merger), now);
fail("RemoveOldShards should throw an exception if the current id
is non-sensical");
}
catch (RuntimeException e) {}
NodeId.renewLocalId();
- ByteBuffer cleaned = ctx.removeOldShards(state.context,
Integer.MAX_VALUE);
+ ByteBuffer merger = ctx.computeOldShardMerger(state.context,
Collections.<NodeId.NodeIdRecord>emptyList(), 0);
+ ByteBuffer cleaned = ctx.removeOldShards(ctx.merge(state.context,
merger), now);
assert ctx.total(cleaned) == 11;
// Check it is not corrupted anymore
Modified:
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
(original)
+++
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
Mon Nov 7 13:26:00 2011
@@ -326,54 +326,31 @@ public class CounterContextTest
records.add(new NodeId.NodeIdRecord(id1, 2L));
records.add(new NodeId.NodeIdRecord(id3, 4L));
- // Destination of merge is a delta
- ContextState ctx = ContextState.allocate(5, 2);
- ctx.writeElement(id1, 1L, 1L);
+ ContextState ctx = ContextState.allocate(5, 3);
+ ctx.writeElement(id1, 1L, 1L, true);
ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
ctx.writeElement(id3, 3L, 3L, true);
ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
ctx.writeElement(NodeId.fromInt(5), 7L, 3L, true);
- ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records);
+ ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records,
Integer.MAX_VALUE);
+
ContextState m = new ContextState(merger);
assert m.getNodeId().equals(id1);
assert m.getClock() <= -now;
- assert m.getCount() == 0;
+ assert m.getCount() == -1L;
+ assert m.isDelta();
m.moveToNext();
assert m.getNodeId().equals(id3);
- assert m.getClock() == 4L;
- assert m.getCount() == 1L;
- assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context,
merger));
-
- // Source of merge is a delta
- ctx = ContextState.allocate(4, 1);
- ctx.writeElement(id1, 1L, 1L, true);
- ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
- ctx.writeElement(id3, 3L, 3L);
- ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
- merger = cc.computeOldShardMerger(ctx.context, records);
- assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context,
merger));
-
- // source and destination of merge are deltas
- ctx = ContextState.allocate(4, 2);
- ctx.writeElement(id1, 1L, 1L, true);
- ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
- ctx.writeElement(id3, 3L, 3L, true);
- ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
- merger = cc.computeOldShardMerger(ctx.context, records);
- assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context,
merger));
-
- // none of source and destination of merge are deltas
- ctx = ContextState.allocate(4, 0);
- ctx.writeElement(id1, 1L, 1L);
- ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
- ctx.writeElement(id3, 3L, 3L);
- ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
- merger = cc.computeOldShardMerger(ctx.context, records);
+ assert m.getClock() <= -now;
+ assert m.getCount() == -3L;
+ assert m.isDelta();
+ m.moveToNext();
+ assert m.getNodeId().equals(NodeId.getLocalId());
+ assert m.getClock() == 1L;
+ assert m.getCount() == 4L;
+ assert m.isDelta();
assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context,
merger));
}
@@ -388,29 +365,20 @@ public class CounterContextTest
records.add(new NodeId.NodeIdRecord(id3, 4L));
records.add(new NodeId.NodeIdRecord(id6, 10L));
- ContextState ctx = ContextState.allocate(6, 2);
- ctx.writeElement(id1, 1L, 1L);
+ ContextState ctx = ContextState.allocate(6, 3);
+ ctx.writeElement(id1, 1L, 1L, true);
ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
ctx.writeElement(id3, 3L, 3L, true);
ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
ctx.writeElement(NodeId.fromInt(5), 7L, 3L, true);
ctx.writeElement(id6, 5L, 6L);
- ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records);
+ ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records,
Integer.MAX_VALUE);
ByteBuffer merged = cc.merge(ctx.context, merger);
assert cc.total(ctx.context) == cc.total(merged);
ByteBuffer cleaned = cc.removeOldShards(merged,
(int)(System.currentTimeMillis() / 1000) + 1);
assert cc.total(ctx.context) == cc.total(cleaned);
- assert cleaned.remaining() == ctx.context.remaining() - stepLength;
-
- merger = cc.computeOldShardMerger(cleaned, records);
- merged = cc.merge(cleaned, merger);
- assert cc.total(ctx.context) == cc.total(merged);
-
- cleaned = cc.removeOldShards(merged, (int)(System.currentTimeMillis()
/ 1000) + 1);
- assert cc.total(ctx.context) == cc.total(cleaned);
- assert cleaned.remaining() == ctx.context.remaining() - 2 * stepLength
- 2;
-
+ assert cleaned.remaining() == ctx.context.remaining() - stepLength - 2;
}
}