Author: jbellis
Date: Thu Mar 10 20:28:17 2011
New Revision: 1080338
URL: http://svn.apache.org/viewvc?rev=1080338&view=rev
Log:
merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/build.xml
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/contrib/pig/bin/pig_cassandra
cassandra/trunk/contrib/pig/build.xml
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7:1026516-1079936
+/cassandra/branches/cassandra-0.7:1026516-1080312
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Mar 10 20:28:17 2011
@@ -27,6 +27,9 @@
* avoid writing empty rows when scrubbing tombstoned rows (CASSANDRA-2296)
* fix assertion error in range and index scans for CL < ALL
(CASSANDRA-2282)
+ * fix commitlog replay when flush position refers to data that didn't
+ get synced before server died (CASSANDRA-2285)
+ * fix fd leak in sstable2json with non-mmap'd i/o (CASSANDRA-2304)
0.7.3
Modified: cassandra/trunk/build.xml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Thu Mar 10 20:28:17 2011
@@ -631,7 +631,16 @@
<fileset dir="@{inputdir}" includes="@{filter}" />
</batchtest>
</junit>
- <fail if="testfailed" message="Some @{suitename} test(s) failed."/>
+ <fail message="Some @{suitename} test(s) failed.">
+ <condition>
+ <and>
+ <isset property="testfailed"/>
+ <not>
+ <isset property="ant.test.failure.ignore"/>
+ </not>
+ </and>
+ </condition>
+ </fail>
</sequential>
</macrodef>
Modified: cassandra/trunk/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Thu Mar 10 20:28:17 2011
@@ -87,7 +87,7 @@ commitlog_rotation_threshold_in_mb: 128
# performing the sync.
commitlog_sync: periodic
-# the other option is "timed," where writes may be acked immediately
+# the other option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
# milliseconds.
commitlog_sync_period_in_ms: 10000
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1079936
+/cassandra/branches/cassandra-0.7/contrib:1026516-1080312
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Modified: cassandra/trunk/contrib/pig/bin/pig_cassandra
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/bin/pig_cassandra?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/bin/pig_cassandra (original)
+++ cassandra/trunk/contrib/pig/bin/pig_cassandra Thu Mar 10 20:28:17 2011
@@ -24,13 +24,13 @@ for jar in $cassandra_home/lib/*.jar $ca
CLASSPATH=$CLASSPATH:$jar
done
-# cassandra_loadfunc jar.
-LOADFUNC_JAR=`ls -1 $cwd/../build/*.jar`
-if [ ! -e $LOADFUNC_JAR ]; then
- echo "Unable to locate cassandra_loadfunc jar: please run ant." >&2
+# cassandra_storage jar.
+STORAGE_JAR=`ls -1 $cwd/../build/*.jar`
+if [ ! -e $STORAGE_JAR ]; then
+ echo "Unable to locate cassandra_storage jar: please run ant." >&2
exit 1
fi
-CLASSPATH=$CLASSPATH:$LOADFUNC_JAR
+CLASSPATH=$CLASSPATH:$STORAGE_JAR
if [ "x$PIG_HOME" = "x" ]; then
echo "PIG_HOME not set: requires Pig >= 0.7.0" >&2
Modified: cassandra/trunk/contrib/pig/build.xml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/build.xml?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/build.xml (original)
+++ cassandra/trunk/contrib/pig/build.xml Thu Mar 10 20:28:17 2011
@@ -17,7 +17,7 @@
~ specific language governing permissions and limitations
~ under the License.
-->
-<project basedir="." default="jar" name="cassandra_loadfunc">
+<project basedir="." default="jar" name="cassandra_storage">
<!-- stores the environment for locating PIG_HOME -->
<property environment="env" />
<property name="cassandra.dir" value="../.." />
@@ -32,7 +32,7 @@
<property name="build.lib" value="${basedir}/lib" />
<property name="build.out" value="${basedir}/build" />
<property name="build.classes" value="${build.out}/classes" />
- <property name="final.name" value="cassandra_loadfunc" />
+ <property name="final.name" value="cassandra_storage" />
<path id="pig.classpath">
<fileset file="${env.PIG_HOME}/pig*.jar" />
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1080312
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1080312
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1080312
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1080312
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1080312
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu
Mar 10 20:28:17 2011
@@ -32,6 +32,7 @@ import javax.management.ObjectName;
import com.google.common.collect.Iterables;
import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -374,7 +375,6 @@ public class ColumnFamilyStore implement
{
public void run()
{
- logger.info("Creating index {}.{}", table,
indexedCfMetadata.cfName);
try
{
forceBlockingFlush();
@@ -388,7 +388,6 @@ public class ColumnFamilyStore implement
throw new AssertionError(e);
}
buildSecondaryIndexes(getSSTables(),
FBUtilities.singleton(info.name));
- logger.info("Index {} complete", indexedCfMetadata.cfName);
SystemTable.setIndexBuilt(table.name,
indexedCfMetadata.cfName);
}
};
@@ -397,7 +396,8 @@ public class ColumnFamilyStore implement
public void buildSecondaryIndexes(Collection<SSTableReader> sstables,
SortedSet<ByteBuffer> columns)
{
- logger.debug("Submitting index build to compactionmanager");
+ logger.info(String.format("Submitting index build of %s for data in
%s",
+ metadata.comparator.getString(columns),
StringUtils.join(sstables, ", ")));
Table.IndexBuilder builder = table.createIndexBuilder(this, columns,
new ReducingKeyIterator(sstables));
Future future = CompactionManager.instance.submitIndexBuild(this,
builder);
try
@@ -414,6 +414,7 @@ public class ColumnFamilyStore implement
{
throw new RuntimeException(e);
}
+ logger.info("Index build of " + metadata.comparator.getString(columns)
+ " complete");
}
// called when dropping or renaming a CF. Performs mbean housekeeping and
invalidates CFS to other operations.
@@ -710,18 +711,22 @@ public class ColumnFamilyStore implement
/** flush the given memtable and swap in a new one for its CFS, if it
hasn't been frozen already. threadsafe. */
Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog)
{
- if (oldMemtable.isPendingFlush())
+ // Only one thread will succeed in marking it as pending flush; the
others can go back to processing writes
+ if (!oldMemtable.markPendingFlush())
+ {
+ logger.debug("memtable is already pending flush; another thread
must be flushing it");
return null;
+ }
+ assert memtable == oldMemtable;
boolean isDropped = isIndex()
? DatabaseDescriptor.getCFMetaData(table.name,
getParentColumnfamily()) == null
: DatabaseDescriptor.getCFMetaData(metadata.cfId) ==
null;
if (isDropped)
- return null; // column family was dropped. no point in flushing.
-
- // Only one thread will succeed in marking it as pending flush; the
others can go back to processing writes
- if (!oldMemtable.markPendingFlush())
+ {
+ logger.debug("column family was dropped; no point in flushing");
return null;
+ }
// Table.flusherLock ensures that we schedule discardCompletedSegments
calls in the same order as their
// contexts (commitlog position) were read, even though the flush
executor is multithreaded.
@@ -729,15 +734,14 @@ public class ColumnFamilyStore implement
try
{
final CommitLogSegment.CommitLogContext ctx = writeCommitLog ?
CommitLog.instance.getContext() : null;
- logger.info("switching in a fresh Memtable for " + columnFamily +
" at " + ctx);
// submit the memtable for any indexed sub-cfses, and our own.
List<ColumnFamilyStore> icc = new
ArrayList<ColumnFamilyStore>(indexedColumns.size());
- icc.add(this);
- for (ColumnFamilyStore indexCfs : indexedColumns.values())
+ // don't assume that this.memtable is dirty; forceFlush can bring
us here during index build even if it is not
+ for (ColumnFamilyStore cfs :
Iterables.concat(Collections.singleton(this), indexedColumns.values()))
{
- if (!indexCfs.memtable.isClean())
- icc.add(indexCfs);
+ if (!cfs.memtable.isClean())
+ icc.add(cfs);
}
final CountDownLatch latch = new CountDownLatch(icc.size());
for (ColumnFamilyStore cfs : icc)
@@ -756,6 +760,10 @@ public class ColumnFamilyStore implement
submitFlush(pendingFlush, latch);
}
}
+ // we marked our memtable as frozen as part of the concurrency
control,
+ // so even if there was nothing to flush we need to switch it out
+ if (!icc.contains(this))
+ memtable = new Memtable(this);
// when all the memtables have been written, including for
indexes, mark the flush in the commitlog header.
// a second executor makes sure the onMemtableFlushes get called
in the right order,
@@ -799,8 +807,17 @@ public class ColumnFamilyStore implement
public Future<?> forceFlush()
{
- if (memtable.isClean())
+ // during index build, 2ary index memtables can be dirty even if
parent is not. if so,
+ // we want flushLargestMemtables to flush the 2ary index ones too.
+ boolean clean = true;
+ for (ColumnFamilyStore cfs :
Iterables.concat(Collections.singleton(this), getIndexColumnFamilyStores()))
+ clean &= cfs.memtable.isClean();
+
+ if (clean)
+ {
+ logger.debug("forceFlush requested but everything is clean");
return null;
+ }
return maybeSwitchMemtable(memtable, true);
}
@@ -1987,6 +2004,11 @@ public class ColumnFamilyStore implement
return indexedColumns.get(column);
}
+ public Collection<ColumnFamilyStore> getIndexColumnFamilyStores()
+ {
+ return indexedColumns.values();
+ }
+
public ColumnFamily newIndexedColumnFamily(ByteBuffer column)
{
return ColumnFamily.create(indexedColumns.get(column).metadata);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Thu Mar 10 20:28:17 2011
@@ -191,11 +191,14 @@ public class CommitLog
logger.info(headerPath + " incomplete, missing or corrupt.
Everything is ok, don't panic. CommitLog will be replayed from the
beginning");
logger.debug("exception was", ioe);
}
- if (replayPosition < 0)
+ if (replayPosition < 0 || replayPosition > reader.length())
{
+ // replayPosition > reader.length() can happen if some
data gets flushed before it is written to the commitlog
+ // (see
https://issues.apache.org/jira/browse/CASSANDRA-2285)
logger.debug("skipping replay of fully-flushed {}", file);
continue;
}
+
reader.seek(replayPosition);
if (logger.isDebugEnabled())
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Thu Mar 10 20:28:17 2011
@@ -32,6 +32,7 @@ import javax.management.ObjectName;
import com.google.common.base.Charsets;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -2330,14 +2331,28 @@ public class StorageService implements I
ColumnFamilyStore largestByThroughput = null;
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- if (largestByOps == null || cfs.getMemtableColumnsCount() >
largestByOps.getMemtableColumnsCount())
+ long ops = 0;
+ long throughput = 0;
+ for (ColumnFamilyStore subordinate :
Iterables.concat(Collections.singleton(cfs), cfs.getIndexColumnFamilyStores()))
+ {
+ ops += subordinate.getMemtableColumnsCount();
+ throughput = subordinate.getMemtableThroughputInMB();
+ }
+
+ if (ops > 0 && (largestByOps == null || ops >
largestByOps.getMemtableColumnsCount()))
+ {
+ logger_.debug(ops + " total ops in " + cfs);
largestByOps = cfs;
- if (largestByThroughput == null || cfs.getMemtableThroughputInMB()
> largestByThroughput.getMemtableThroughputInMB())
+ }
+ if (throughput > 0 && (largestByThroughput == null || throughput >
largestByThroughput.getMemtableThroughputInMB()))
+ {
+ logger_.debug(throughput + " total throughput in " + cfs);
largestByThroughput = cfs;
+ }
}
if (largestByOps == null)
{
- logger_.error("Unable to reduce heap usage since there are no
column families defined");
+ logger_.info("Unable to reduce heap usage since there are no dirty
column families");
return;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Thu
Mar 10 20:28:17 2011
@@ -19,6 +19,7 @@
package org.apache.cassandra.tools;
import java.io.File;
+import java.io.IOError;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
@@ -39,6 +40,7 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
@@ -101,28 +103,23 @@ public class SSTableExport
* @param out output stream
* @param comparator columns comparator
* @param cfMetaData Column Family metadata (to get validator)
+ * @return pair of (number of columns serialized, last column serialized)
*/
- private static void serializeColumns(Iterator<IColumn> columns,
PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
+ private static Pair<Integer, ByteBuffer>
serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType
comparator, CFMetaData cfMetaData)
{
+ int n = 0;
+ IColumn column = null;
while (columns.hasNext())
{
- serializeColumn(columns.next(), out, comparator, cfMetaData);
+ column = columns.next();
+ n++;
+ serializeColumn(column, out, comparator, cfMetaData);
if (columns.hasNext())
out.print(", ");
}
- }
- /**
- * Serialize a collection of the columns
- * @param columns collection of the columns to serialize
- * @param out output stream
- * @param comparator columns comparator
- * @param cfMetaData Column Family metadata (to get validator)
- */
- private static void serializeColumns(Collection<IColumn> columns,
PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
- {
- serializeColumns(columns.iterator(), out, comparator, cfMetaData);
+ return new Pair<Integer, ByteBuffer>(n, column == null ? null :
column.name());
}
/**
@@ -198,25 +195,29 @@ public class SSTableExport
IColumnIterator columns = filter.getSSTableColumnIterator(reader);
- int columnCount = 0;
- while (columns.hasNext())
- {
- // setting new start column to the last of the current columns
- startColumn = columns.next().name();
- columnCount++;
- }
-
+ Pair<Integer, ByteBuffer> serialized;
try
{
- columns = filter.getSSTableColumnIterator(reader); // iterator
reset
- serializeRow(columns, isSuperCF, out);
+ serialized = serializeRow(columns, isSuperCF, out);
}
catch (IOException e)
{
System.err.println("WARNING: Corrupt row " + key + "
(skipping).");
+ continue;
+ }
+ finally
+ {
+ try
+ {
+ columns.close();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
- if (columnCount < PAGE_SIZE)
+ if (serialized.left < PAGE_SIZE)
break;
out.print(",");
@@ -231,10 +232,11 @@ public class SSTableExport
* @param columns columns of the row
* @param isSuper true if wrapping Column Family is Super
* @param out output stream
+ * @return pair of (number of columns serialized, last column serialized)
*
* @throws IOException on any I/O error.
*/
- private static void serializeRow(IColumnIterator columns, boolean isSuper,
PrintStream out) throws IOException
+ private static Pair<Integer, ByteBuffer> serializeRow(IColumnIterator
columns, boolean isSuper, PrintStream out) throws IOException
{
ColumnFamily columnFamily = columns.getColumnFamily();
CFMetaData cfMetaData = columnFamily.metadata();
@@ -243,9 +245,12 @@ public class SSTableExport
if (isSuper)
{
+ int n = 0;
+ IColumn column = null;
while (columns.hasNext())
{
- IColumn column = columns.next();
+ column = columns.next();
+ n++;
out.print(asKey(comparator.getString(column.name())));
out.print("{");
@@ -254,17 +259,19 @@ public class SSTableExport
out.print(", ");
out.print(asKey("subColumns"));
out.print("[");
- serializeColumns(column.getSubColumns(), out,
columnFamily.getSubComparator(), cfMetaData);
+ serializeColumns(column.getSubColumns().iterator(), out,
columnFamily.getSubComparator(), cfMetaData);
out.print("]");
out.print("}");
if (columns.hasNext())
out.print(", ");
}
+
+ return new Pair<Integer, ByteBuffer>(n, column == null ? null :
column.name());
}
else
{
- serializeColumns(columns, out, comparator, cfMetaData);
+ return serializeColumns(columns, out, comparator, cfMetaData);
}
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Thu
Mar 10 20:28:17 2011
@@ -108,6 +108,30 @@ public class CommitLogTest extends Clean
testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
}
+ @Test
+ public void testRecoveryWithHeaderPositionGreaterThanLogLength() throws
Exception
+ {
+ // Note: this can actually happen (in periodic mode) when data is
flushed
+ // before it had time to hit the commitlog (since the header is
flushed by the system)
+ // see https://issues.apache.org/jira/browse/CASSANDRA-2285
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(out);
+ Checksum checksum = new CRC32();
+
+ // write the first checksum after the fixed-size part, so we won't
read garbage lastFlushedAt data.
+ dos.writeInt(1);
+ checksum.update(1);
+ dos.writeLong(checksum.getValue());
+ dos.writeInt(0);
+ checksum.update(0);
+ dos.writeInt(200);
+ checksum.update(200);
+ dos.writeLong(checksum.getValue());
+ dos.close();
+
+ testRecovery(out.toByteArray(), new byte[0]);
+ }
+
protected void testRecoveryWithBadSizeArgument(int size, int dataSize)
throws Exception
{
Checksum checksum = new CRC32();