Author: slebresne
Date: Tue Aug 2 15:02:10 2011
New Revision: 1153156
URL: http://svn.apache.org/viewvc?rev=1153156&view=rev
Log:
Fix assertion error during compaction of counter CFs
patch by slebresne; reviewed by jbellis for CASSANDRA-2968
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/NodeId.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/SchemaLoader.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1153156&r1=1153155&r2=1153156&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Aug 2 15:02:10 2011
@@ -24,6 +24,7 @@
* ignore system tables during repair (CASSANDRA-2979)
* throw exception when NTS is given replication_factor as an option
(CASSANDRA-2960)
+ * fix assertion error during compaction of counter CFs (CASSANDRA-2968)
0.8.2
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java?rev=1153156&r1=1153155&r2=1153156&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java
Tue Aug 2 15:02:10 2011
@@ -404,9 +404,8 @@ public class SystemTable
* table)
* @param newNodeId the new current local node id to record
*/
- public static void writeCurrentLocalNodeId(NodeId oldNodeId, NodeId
newNodeId)
+ public static void writeCurrentLocalNodeId(NodeId oldNodeId, NodeId
newNodeId, long now)
{
- long now = System.currentTimeMillis();
ByteBuffer ip =
ByteBuffer.wrap(FBUtilities.getLocalAddress().getAddress());
ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, NODE_ID_CF);
@@ -441,7 +440,7 @@ public class SystemTable
ColumnFamily cf =
table.getColumnFamilyStore(NODE_ID_CF).getColumnFamily(filter);
NodeId previous = null;
- for (IColumn c : cf.getReverseSortedColumns())
+ for (IColumn c : cf)
{
if (previous != null)
l.add(new NodeId.NodeIdRecord(previous, c.timestamp()));
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1153156&r1=1153155&r2=1153156&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
Tue Aug 2 15:02:10 2011
@@ -193,7 +193,7 @@ public class LazilyCompactedRow extends
assert container != null;
IColumn reduced = container.iterator().next();
ColumnFamily purged = shouldPurge ?
ColumnFamilyStore.removeDeleted(container, controller.gcBefore) : container;
- if (purged != null &&
purged.metadata().getDefaultValidator().isCommutative())
+ if (shouldPurge && purged != null &&
purged.metadata().getDefaultValidator().isCommutative())
{
CounterColumn.removeOldShards(purged, controller.gcBefore);
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1153156&r1=1153155&r2=1153156&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
Tue Aug 2 15:02:10 2011
@@ -82,8 +82,9 @@ public class PrecompactedRow extends Abs
cf.addAll(thisCF);
}
}
- compactedCf = controller.shouldPurge(key) ?
ColumnFamilyStore.removeDeleted(cf, controller.gcBefore) : cf;
- if (compactedCf != null &&
compactedCf.metadata().getDefaultValidator().isCommutative())
+ boolean shouldPurge = controller.shouldPurge(key);
+ compactedCf = shouldPurge ? ColumnFamilyStore.removeDeleted(cf,
controller.gcBefore) : cf;
+ if (shouldPurge && compactedCf != null &&
compactedCf.metadata().getDefaultValidator().isCommutative())
{
CounterColumn.removeOldShards(compactedCf, controller.gcBefore);
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1153156&r1=1153155&r2=1153156&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
Tue Aug 2 15:02:10 2011
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.*;
+import org.apache.log4j.Logger;
+
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.db.DBConstants;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -65,6 +67,8 @@ public class CounterContext implements I
private static final int COUNT_LENGTH = DBConstants.longSize_;
private static final int STEP_LENGTH = NodeId.LENGTH + CLOCK_LENGTH +
COUNT_LENGTH;
+ private static final Logger logger =
Logger.getLogger(CounterContext.class);
+
// Time in ms since a node id has been renewed before we consider using it
// during a merge
private static final long MIN_MERGE_DELAY = 5 * 60 * 1000; // should be
aplenty
@@ -524,6 +528,8 @@ public class CounterContext implements I
if (now - currRecord.timestamp < MIN_MERGE_DELAY)
return context;
+ assert !currRecord.id.equals(NodeId.getLocalId());
+
int c = state.getNodeId().compareTo(currRecord.id);
if (c == 0)
{
@@ -539,6 +545,8 @@ public class CounterContext implements I
}
else
{
+ assert !foundState.getNodeId().equals(state.getNodeId());
+
// Found someone to merge it to
int nbDelta = foundState.isDelta() ? 1 : 0;
nbDelta += state.isDelta() ? 1 : 0;
@@ -585,20 +593,24 @@ public class CounterContext implements I
int hlength = headerLength(context);
ContextState state = new ContextState(context, hlength);
int removedBodySize = 0, removedHeaderSize = 0;
+ boolean forceFixing = false;
while (state.hasRemaining())
{
long clock = state.getClock();
- if (clock < 0 && -((int)(clock / 1000)) < gcBefore)
+ if (clock < 0 && -((int)(clock / 1000)) < gcBefore &&
(state.getCount() == 0 || !state.isDelta()))
{
- assert state.getCount() == 0;
removedBodySize += STEP_LENGTH;
if (state.isDelta())
removedHeaderSize += HEADER_ELT_LENGTH;
}
+ else if (clock < 0 && state.getCount() != 0 && state.isDelta())
+ {
+ forceFixing = true;
+ }
state.moveToNext();
}
- if (removedBodySize == 0)
+ if (removedBodySize == 0 && !forceFixing)
return context;
int newSize = context.remaining() - removedHeaderSize -
removedBodySize;
@@ -608,16 +620,31 @@ public class CounterContext implements I
ContextState cleaned = new ContextState(cleanedContext, newHlength);
state.reset();
+ long toAddBack = 0;
while (state.hasRemaining())
{
long clock = state.getClock();
- if (clock > 0 || -((int)(clock / 1000)) >= gcBefore)
+ if (!(clock < 0 && -((int)(clock / 1000)) < gcBefore &&
(state.getCount() == 0 || !state.isDelta())))
{
- state.copyTo(cleaned);
+ if (clock < 0 && state.getCount() != 0 && state.isDelta())
+ {
+ // we should not get there, but we have been creating
problematic context prior to #2968
+ if (state.getNodeId().equals(NodeId.getLocalId()))
+ throw new RuntimeException("Merged counter shard with
a count != 0 (likely due to #2968). You need to restart this node with
-Dcassandra.renew_counter_id=true to fix.");
+
+ // we will "fix" it, but log a message
+ logger.info("Collectable old shard with a count != 0. Will
fix.");
+ cleaned.writeElement(state.getNodeId(), clock - 1L, 0,
true);
+ toAddBack += state.getCount();
+ }
+ else
+ {
+ state.copyTo(cleaned);
+ }
}
state.moveToNext();
}
- return cleanedContext;
+ return toAddBack == 0 ? cleanedContext : merge(cleanedContext,
create(toAddBack));
}
/**
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/NodeId.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/NodeId.java?rev=1153156&r1=1153155&r2=1153156&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/NodeId.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/NodeId.java
Tue Aug 2 15:02:10 2011
@@ -192,7 +192,7 @@ public class NodeId implements Comparabl
// no recorded local node id, generating a new one and saving
it
id = generate();
logger.info("No saved local node id, using newly generated:
{}", id);
- SystemTable.writeCurrentLocalNodeId(null, id);
+ SystemTable.writeCurrentLocalNodeId(null, id,
System.currentTimeMillis());
current = new AtomicReference<NodeId>(id);
olds = new CopyOnWriteArrayList();
}
@@ -206,11 +206,12 @@ public class NodeId implements Comparabl
synchronized void renewCurrent()
{
+ long now = System.currentTimeMillis();
NodeId newNodeId = generate();
NodeId old = current.get();
- SystemTable.writeCurrentLocalNodeId(old, newNodeId);
+ SystemTable.writeCurrentLocalNodeId(old, newNodeId, now);
current.set(newNodeId);
- olds.add(new NodeIdRecord(old));
+ olds.add(new NodeIdRecord(old, now));
}
}
@@ -219,15 +220,27 @@ public class NodeId implements Comparabl
public final NodeId id;
public final long timestamp;
- public NodeIdRecord(NodeId id)
- {
- this(id, System.currentTimeMillis());
- }
-
public NodeIdRecord(NodeId id, long timestamp)
{
this.id = id;
this.timestamp = timestamp;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ NodeIdRecord otherRecord = (NodeIdRecord)o;
+ return id.equals(otherRecord.id) && timestamp ==
otherRecord.timestamp;
+ }
+
+ public String toString()
+ {
+ return String.format("(%s, %d)", id.toString(), timestamp);
+ }
}
}
Modified:
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/SchemaLoader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/SchemaLoader.java?rev=1153156&r1=1153155&r2=1153156&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/SchemaLoader.java
(original)
+++
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/SchemaLoader.java
Tue Aug 2 15:02:10 2011
@@ -137,7 +137,8 @@ public class SchemaLoader
st,
bytes,
null)
-
.defaultValidator(CounterColumnType.instance),
+
.defaultValidator(CounterColumnType.instance)
+ .mergeShardsChance(1.0),
new CFMetaData(ks1,
"SuperCounter1",
su,
Modified:
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java?rev=1153156&r1=1153155&r2=1153156&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
(original)
+++
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
Tue Aug 2 15:02:10 2011
@@ -18,9 +18,13 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
import org.junit.Test;
+import static org.junit.Assert.fail;
+import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.thrift.*;
@@ -63,4 +67,60 @@ public class CounterMutationTest extends
assert sc2.getSubColumns().size() == 1;
assert sc2.getSubColumn(bytes("Column2")) != null;
}
+
+ @Test
+ public void testGetOldShardFromSystemTable() throws IOException
+ {
+ // Renewing a bunch of times and checking we get the same thing from
+ // the system table that what is in memory
+ NodeId.renewLocalId();
+ NodeId.renewLocalId();
+ NodeId.renewLocalId();
+
+ List<NodeId.NodeIdRecord> inMem = NodeId.getOldLocalNodeIds();
+ List<NodeId.NodeIdRecord> onDisk = SystemTable.getOldLocalNodeIds();
+
+ assert inMem.equals(onDisk);
+ }
+
+ @Test
+ public void testRemoveOldShardFixCorrupted() throws IOException
+ {
+ CounterContext ctx = CounterContext.instance();
+
+ // Check that corrupted context created prior to #2968 are fixed by
removeOldShards
+ NodeId id1 = NodeId.getLocalId();
+ NodeId.renewLocalId();
+ NodeId id2 = NodeId.getLocalId();
+
+ ContextState state = ContextState.allocate(3, 2);
+ state.writeElement(NodeId.fromInt(1), 1, 4, false);
+ state.writeElement(id1, 3, 2, true);
+ state.writeElement(id2, -System.currentTimeMillis(), 5, true); //
corrupted!
+
+ assert ctx.total(state.context) == 11;
+
+ try
+ {
+ ctx.removeOldShards(state.context, Integer.MAX_VALUE);
+ fail("RemoveOldShards should throw an exception if the current id
is non-sensical");
+ }
+ catch (RuntimeException e) {}
+
+ NodeId.renewLocalId();
+ ByteBuffer cleaned = ctx.removeOldShards(state.context,
Integer.MAX_VALUE);
+ assert ctx.total(cleaned) == 11;
+
+ // Check it is not corrupted anymore
+ ContextState state2 = new ContextState(cleaned);
+ while (state2.hasRemaining())
+ {
+ assert state2.getClock() >= 0 || state2.getCount() == 0;
+ state2.moveToNext();
+ }
+
+ // Check that if we merge old and clean on another node, we keep the
right count
+ ByteBuffer onRemote = ctx.merge(ctx.clearAllDelta(state.context),
ctx.clearAllDelta(cleaned));
+ assert ctx.total(onRemote) == 11;
+ }
}