Author: jbellis Date: Wed Jul 20 15:07:09 2011 New Revision: 1148811 URL: http://svn.apache.org/viewvc?rev=1148811&view=rev Log: add cleanupIfNecessary for single-pass streaming SSTable build patch by Yuki Morishita; reviewed by stuhood and jbellis for CASSANDRA-2906
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1148811&r1=1148810&r2=1148811&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Jul 20 15:07:09 2011 @@ -161,80 +161,87 @@ public class IncomingStreamReader SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys); CompactionController controller = null; - BytesReadTracker in = new BytesReadTracker(input); - - for (Pair<Long, Long> section : localFile.sections) + try { - long length = section.right - section.left; - long bytesRead = 0; - while (bytesRead < length) + BytesReadTracker in = new BytesReadTracker(input); + + for (Pair<Long, Long> section : localFile.sections) { - in.reset(); - key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in)); - long dataSize = SSTableReader.readRowSize(in, localFile.desc); - ColumnFamily cf = null; - if (cfs.metadata.getDefaultValidator().isCommutative()) + long length = section.right - section.left; + long bytesRead = 0; + while (bytesRead < length) { - // take care of counter column family - if (controller == null) - controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true); - SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true); - AbstractCompactedRow row = controller.getCompactedRow(iter); - writer.append(row); - - if (row instanceof PrecompactedRow) + in.reset(); + key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in)); + long dataSize = SSTableReader.readRowSize(in, localFile.desc); + ColumnFamily cf = null; + if (cfs.metadata.getDefaultValidator().isCommutative()) { - // we do not purge so we should not get a null here - cf = ((PrecompactedRow)row).getFullColumnFamily(); + // take care of counter column family + if (controller == null) + controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true); + SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true); + AbstractCompactedRow row = controller.getCompactedRow(iter); + writer.append(row); + + if (row instanceof PrecompactedRow) + { + // we do not purge so we should not get a null here + cf = ((PrecompactedRow)row).getFullColumnFamily(); + } } - } - else - { - // skip BloomFilter - IndexHelper.skipBloomFilter(in); - // skip Index - IndexHelper.skipIndex(in); - - // restore ColumnFamily - cf = ColumnFamily.create(cfs.metadata); - ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in); - ColumnFamily.serializer().deserializeColumns(in, cf, true, true); + else + { + // skip BloomFilter + IndexHelper.skipBloomFilter(in); + // skip Index + IndexHelper.skipIndex(in); + + // restore ColumnFamily + cf = ColumnFamily.create(cfs.metadata); + ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in); + ColumnFamily.serializer().deserializeColumns(in, cf, true, true); - // write key and cf - writer.append(key, cf); - } + // write key and cf + writer.append(key, cf); + } - // update cache - ColumnFamily cached = cfs.getRawCachedRow(key); - if (cached != null) - { - switch (remoteFile.type) + // update cache + ColumnFamily cached = cfs.getRawCachedRow(key); + if (cached != null) { - case AES: - if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()) - { - // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable - // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning. - logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled."); + switch (remoteFile.type) + { + case AES: + if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()) + { + // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable + // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning. + logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled."); + cfs.invalidateCachedRow(key); + } + else + { + assert cf != null; + cfs.updateRowCache(key, cf); + } + break; + default: cfs.invalidateCachedRow(key); - } - else - { - assert cf != null; - cfs.updateRowCache(key, cf); - } - break; - default: - cfs.invalidateCachedRow(key); - break; + break; + } } - } - bytesRead += in.getBytesRead(); - remoteFile.progress += in.getBytesRead(); + bytesRead += in.getBytesRead(); + remoteFile.progress += in.getBytesRead(); + } } + return writer.closeAndOpenReader(); + } + finally + { + writer.cleanupIfNecessary(); } - return writer.closeAndOpenReader(); } private void retry() throws IOException