Author: brandonwilliams Date: Fri Jul 23 19:22:58 2010 New Revision: 967213
URL: http://svn.apache.org/viewvc?rev=967213&view=rev Log: Keep persistent row size and column count statistics. Patch by brandonwilliams and jbellis; reviewed by jbellis for CASSANDRA-1155 Added: cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Fri Jul 23 19:22:58 2010 @@ -44,6 +44,7 @@ dev * significantly faster reads from row cache (CASSANDRA-1267) * take advantage of row cache during range queries (CASSANDRA-1302) * make GCGraceSeconds a per-ColumnFamily value (CASSANDRA-1276) + * keep persistent row size and column count statistics (CASSANDRA-1155) 0.6.4 Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Fri Jul 23 19:22:58 2010 @@ -35,6 +35,7 @@ import org.apache.cassandra.db.clock.Abs import org.apache.cassandra.db.clock.TimestampReconciler; import org.apache.cassandra.db.HintedHandOffManager; import org.apache.cassandra.db.SystemTable; +import org.apache.cassandra.db.StatisticsTable; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; @@ -64,6 +65,7 @@ public final class CFMetaData public static final CFMetaData HintsCf = new CFMetaData(Table.SYSTEM_TABLE, HintedHandOffManager.HINTS_CF, ColumnFamilyType.Super, ClockType.Timestamp, UTF8Type.instance, BytesType.instance, new TimestampReconciler(), "hinted handoff data", 0, false, 0.01, DEFAULT_GC_GRACE_SECONDS, 1, Collections.<byte[], ColumnDefinition>emptyMap()); public static final CFMetaData MigrationsCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.MIGRATIONS_CF, ColumnFamilyType.Standard, ClockType.Timestamp, TimeUUIDType.instance, null, new TimestampReconciler(), "individual schema mutations", 0, false, 0.01, DEFAULT_GC_GRACE_SECONDS, 2, Collections.<byte[], ColumnDefinition>emptyMap()); public static final CFMetaData SchemaCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.SCHEMA_CF, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "current state of the schema", 0, false, 0.01, DEFAULT_GC_GRACE_SECONDS, 3, Collections. <byte[], ColumnDefinition>emptyMap()); + public static final CFMetaData StatisticsCf = new CFMetaData(Table.SYSTEM_TABLE, StatisticsTable.STATISTICS_CF, ColumnFamilyType.Super, ClockType.Timestamp, UTF8Type.instance, BytesType.instance, new TimestampReconciler(), "persistent CF statistics for the local node", 0, false, 0.01, DEFAULT_GC_GRACE_SECONDS, 4, Collections.<byte[], ColumnDefinition>emptyMap()); /** * @return An immutable mapping of (ksname,cfname) to id. Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul 23 19:22:58 2010 @@ -348,12 +348,14 @@ public class DatabaseDescriptor KSMetaData systemMeta = new KSMetaData(Table.SYSTEM_TABLE, LocalStrategy.class, 1, new CFMetaData[]{CFMetaData.StatusCf, CFMetaData.HintsCf, CFMetaData.MigrationsCf, - CFMetaData.SchemaCf + CFMetaData.SchemaCf, + CFMetaData.StatisticsCf }); CFMetaData.map(CFMetaData.StatusCf); CFMetaData.map(CFMetaData.HintsCf); CFMetaData.map(CFMetaData.MigrationsCf); CFMetaData.map(CFMetaData.SchemaCf); + CFMetaData.map(CFMetaData.StatisticsCf); tables.put(Table.SYSTEM_TABLE, systemMeta); /* Load the seeds for node contact points */ Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Fri Jul 23 19:22:58 2010 @@ -69,18 +69,20 @@ public class ColumnFamilySerializer impl serializeForSSTable(columnFamily, dos); } - public void serializeForSSTable(ColumnFamily columnFamily, DataOutput dos) + public int serializeForSSTable(ColumnFamily columnFamily, DataOutput dos) { try { serializeCFInfo(columnFamily, dos); Collection<IColumn> columns = columnFamily.getSortedColumns(); - dos.writeInt(columns.size()); + int count = columns.size(); + dos.writeInt(count); for (IColumn column : columns) { columnFamily.getColumnSerializer().serialize(column, dos); } + return count; } catch (IOException e) { @@ -95,10 +97,10 @@ public class ColumnFamilySerializer impl columnFamily.getClockType().serializer().serialize(_markedForDeleteAt, dos); } - public void serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos) + public int serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos) { ColumnIndexer.serialize(columnFamily, dos); - serializeForSSTable(columnFamily, dos); + return serializeForSSTable(columnFamily, dos); } public ColumnFamily deserialize(DataInput dis) throws IOException Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jul 23 19:22:58 2010 @@ -98,7 +98,7 @@ public class ColumnFamilyStore implement TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushWriters()), new NamedThreadFactory("FLUSH-WRITER-POOL")); - private static ExecutorService commitLogUpdater_ = new JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER"); + private static ExecutorService postFlushExecutor_ = new JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER"); private static final FilenameFilter DB_NAME_FILTER = new FilenameFilter() { @@ -133,10 +133,6 @@ public class ColumnFamilyStore implement private LatencyTracker readStats_ = new LatencyTracker(); private LatencyTracker writeStats_ = new LatencyTracker(); - private long minRowCompactedSize = 0L; - private long maxRowCompactedSize = 0L; - private long rowsCompactedTotalSize = 0L; - private long rowsCompactedCount = 0L; final CFMetaData metadata; ColumnFamilyStore(String table, String columnFamilyName, IPartitioner partitioner, int generation, CFMetaData metadata) @@ -257,32 +253,41 @@ public class ColumnFamilyStore implement } } - public void addToCompactedRowStats(long rowsize) - { - if (minRowCompactedSize < 1 || rowsize < minRowCompactedSize) - minRowCompactedSize = rowsize; - if (rowsize > maxRowCompactedSize) - maxRowCompactedSize = rowsize; - rowsCompactedCount++; - rowsCompactedTotalSize += rowsize; - } - public long getMinRowCompactedSize() { - return minRowCompactedSize; + long min = 0; + for (SSTableReader sstable : ssTables_) + { + if (min == 0 || sstable.getEstimatedRowSize().min() < min) + min = sstable.getEstimatedRowSize().min(); + } + return min; } public long getMaxRowCompactedSize() { - return maxRowCompactedSize; + long max = 0; + for (SSTableReader sstable : ssTables_) + { + if (sstable.getEstimatedRowSize().max() > max) + max = sstable.getEstimatedRowSize().max(); + } + return max; } public long getMeanRowCompactedSize() { - if (rowsCompactedCount > 0) - return rowsCompactedTotalSize / rowsCompactedCount; - else - return 0L; + long sum = 0; + long count = 0; + for (SSTableReader sstable : ssTables_) + { + if (sstable.getEstimatedRowSize().median() > 0) + { + sum += sstable.getEstimatedRowSize().median(); + count++; + } + } + return count > 0 ? sum / count : 0; } public static ColumnFamilyStore createColumnFamilyStore(String table, String columnFamily) @@ -414,7 +419,7 @@ public class ColumnFamilyStore implement // when all the memtables have been written, including for indexes, mark the flush in the commitlog header. // a second executor makes sure the onMemtableFlushes get called in the right order, // while keeping the wait-for-flush (future.get) out of anything latency-sensitive. - return commitLogUpdater_.submit(new WrappedRunnable() + return postFlushExecutor_.submit(new WrappedRunnable() { public void runMayThrow() throws InterruptedException, IOException { @@ -1295,7 +1300,12 @@ public class ColumnFamilyStore implement } }; - return commitLogUpdater_.submit(runnable); + return postFlushExecutor_.submit(runnable); + } + + public static Future<?> submitPostFlush(Runnable runnable) + { + return postFlushExecutor_.submit(runnable); } public long getBloomFilterFalsePositives() Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Fri Jul 23 19:22:58 2010 @@ -348,9 +348,6 @@ public class CompactionManager implement writer.append(row); totalkeysWritten++; - - long rowsize = writer.getFilePointer() - prevpos; - cfs.addToCompactedRowStats(rowsize); } } finally Added: cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java?rev=967213&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java Fri Jul 23 19:22:58 2010 @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.EstimatedHistogram; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.service.StorageService; + +import static com.google.common.base.Charsets.UTF_8; + +public class StatisticsTable +{ + private static Logger logger = LoggerFactory.getLogger(StatisticsTable.class); + public static final String STATISTICS_CF = "Statistics"; + public static final byte[] ROWSIZE_SC = "RowSize".getBytes(UTF_8); + public static final byte[] COLUMNCOUNT_SC = "ColumnCount".getBytes(UTF_8); + + private static DecoratedKey decorate(byte[] key) + { + return StorageService.getPartitioner().decorateKey(key); + } + + public static void persistSSTableStatistics(Descriptor desc, EstimatedHistogram rowsizes, EstimatedHistogram columncounts) throws IOException + { + String filename = getRelativePath(desc.filenameFor(SSTable.COMPONENT_DATA)); + if (isTemporary(filename) || desc.ksname.equals(Table.SYSTEM_TABLE)) + return; + long[] rowbuckets = rowsizes.getBucketOffsets(); + long[] rowvalues = rowsizes.get(false); + long[] columnbuckets = columncounts.getBucketOffsets(); + long[] columnvalues = columncounts.get(false); + RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, filename.getBytes(UTF_8)); + for (int i=0; i<rowbuckets.length; i++) + { + QueryPath path = new QueryPath(STATISTICS_CF, ROWSIZE_SC, FBUtilities.toByteArray(rowbuckets[i])); + rm.add(path, FBUtilities.toByteArray(rowvalues[i]), new TimestampClock(System.currentTimeMillis())); + } + for (int i=0; i<columnbuckets.length; i++) + { + QueryPath path = new QueryPath(STATISTICS_CF, COLUMNCOUNT_SC, FBUtilities.toByteArray(columnbuckets[i])); + rm.add(path, FBUtilities.toByteArray(columnvalues[i]), new TimestampClock(System.currentTimeMillis())); + } + rm.apply(); + if (logger.isDebugEnabled()) + logger.debug("Recorded SSTable statistics for " + filename); + } + + public static void deleteSSTableStatistics(String filepath) throws IOException + { + String filename = getRelativePath(filepath); + RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, filename.getBytes(UTF_8)); + QueryPath path = new QueryPath(STATISTICS_CF); + rm.delete(path, new TimestampClock(System.currentTimeMillis())); + rm.apply(); + if (logger.isDebugEnabled()) + logger.debug("Deleted SSTable statistics for " + filename); + } + + private static long[] getSSTableStatistics(String filepath, byte[] superCol) throws IOException + { + long[] rv; + String filename = getRelativePath(filepath); + QueryPath path = new QueryPath(STATISTICS_CF); + QueryFilter filter = QueryFilter.getNamesFilter(decorate(filename.getBytes(UTF_8)), path, superCol); + ColumnFamily cf = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(STATISTICS_CF).getColumnFamily(filter); + if (cf == null) + return new long[0]; + IColumn scolumn = cf.getColumn(superCol); + rv = new long[scolumn.getSubColumns().size()]; + int i = 0; + for (IColumn col : scolumn.getSubColumns()) { + rv[i] = ByteBuffer.wrap(col.value()).getLong(); + i++; + } + return rv; + } + + public static long [] getSSTableRowSizeStatistics(String filename) throws IOException + { + return getSSTableStatistics(filename, ROWSIZE_SC); + } + + public static long [] getSSTableColumnCountStatistics(String filename) throws IOException + { + return getSSTableStatistics(filename, COLUMNCOUNT_SC); + } + + private static String getRelativePath(String filename) + { + for (String prefix : DatabaseDescriptor.getAllDataFileLocations()) + { + if (filename.startsWith(prefix)) + return filename.substring(prefix.length()); + } + return filename; + } + + private static boolean isTemporary(String filename) + { + return filename.contains("-" + SSTable.TEMPFILE_MARKER); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java Fri Jul 23 19:22:58 2010 @@ -25,4 +25,6 @@ public abstract class AbstractCompactedR public abstract void update(MessageDigest digest); public abstract boolean isEmpty(); + + public abstract int columnCount(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Fri Jul 23 19:22:58 2010 @@ -151,6 +151,11 @@ public class LazilyCompactedRow extends return Iterators.filter(iter, Predicates.notNull()); } + public int columnCount() + { + return columnCount; + } + private class LazyColumnIterator extends ReducingIterator<IColumn, IColumn> { ColumnFamily container = emptyColumnFamily.cloneMeShallow(); Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Fri Jul 23 19:22:58 2010 @@ -22,6 +22,7 @@ public class PrecompactedRow extends Abs private static Logger logger = LoggerFactory.getLogger(PrecompactedRow.class); private final DataOutputBuffer buffer; + private int columnCount = 0; public PrecompactedRow(DecoratedKey key, DataOutputBuffer buffer) { @@ -61,7 +62,7 @@ public class PrecompactedRow extends Abs ColumnFamily cfPurged = major ? ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf; if (cfPurged == null) return; - ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer); + columnCount = ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer); } else { @@ -69,6 +70,7 @@ public class PrecompactedRow extends Abs try { rows.get(0).echoData(buffer); + columnCount = rows.get(0).getColumnCount(); } catch (IOException e) { @@ -92,4 +94,9 @@ public class PrecompactedRow extends Abs { return buffer.getLength() == 0; } + + public int columnCount() + { + return columnCount; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Fri Jul 23 19:22:58 2010 @@ -31,6 +31,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.db.StatisticsTable; +import org.apache.cassandra.utils.EstimatedHistogram; /** * This class is built on top of the SequenceFile. It stores @@ -61,6 +63,8 @@ public abstract class SSTable public static final String TEMPFILE_MARKER = "tmp"; public static List<String> components = Collections.unmodifiableList(Arrays.asList(COMPONENT_FILTER, COMPONENT_INDEX, COMPONENT_DATA)); + protected EstimatedHistogram estimatedRowSize = new EstimatedHistogram(130); + protected EstimatedHistogram estimatedColumnCount = new EstimatedHistogram(112); protected SSTable(String filename, IPartitioner partitioner) { @@ -75,6 +79,16 @@ public abstract class SSTable this.partitioner = partitioner; } + public EstimatedHistogram getEstimatedRowSize() + { + return estimatedRowSize; + } + + public EstimatedHistogram getEstimatedColumnCount() + { + return estimatedColumnCount; + } + public IPartitioner getPartitioner() { return partitioner; @@ -119,6 +133,7 @@ public abstract class SSTable FileUtils.deleteWithConfirm(new File(SSTable.indexFilename(dataFilename))); FileUtils.deleteWithConfirm(new File(SSTable.filterFilename(dataFilename))); FileUtils.deleteWithConfirm(new File(SSTable.compactedFilename(dataFilename))); + StatisticsTable.deleteSSTableStatistics(dataFilename); } catch (IOException e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Jul 23 19:22:58 2010 @@ -26,11 +26,13 @@ import java.lang.ref.Reference; import com.google.common.base.Function; import com.google.common.collect.Collections2; +import org.apache.cassandra.db.StatisticsTable; import org.apache.cassandra.io.util.BufferedRandomAccessFile; import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.utils.BloomFilter; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.EstimatedHistogram; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +142,22 @@ public class SSTableReader extends SSTab return count; } + private void loadStatistics(Descriptor desc) throws IOException + { + // skip loading stats for the system table, or we will infinitely recurse + if (desc.ksname.equals(Table.SYSTEM_TABLE)) + return; + if (logger.isDebugEnabled()) + logger.debug("Load statistics for " + desc); + long[] rowsizes = StatisticsTable.getSSTableRowSizeStatistics(desc.filenameFor(SSTable.COMPONENT_DATA)); + long[] colcounts = StatisticsTable.getSSTableColumnCountStatistics(desc.filenameFor(SSTable.COMPONENT_DATA)); + if (rowsizes.length > 0) + { + estimatedRowSize = new EstimatedHistogram(rowsizes); + estimatedColumnCount = new EstimatedHistogram(colcounts); + } + } + public static SSTableReader open(String dataFileName) throws IOException { return open(Descriptor.fromFilename(dataFileName)); @@ -191,17 +209,38 @@ public class SSTableReader extends SSTab sstable.load(false); sstable.loadBloomFilter(); } + sstable.loadStatistics(desc); return sstable; } + SSTableReader(Descriptor desc, + IPartitioner partitioner, + SegmentedFile ifile, + SegmentedFile dfile, + IndexSummary indexSummary, + BloomFilter bloomFilter, + long maxDataAge) + throws IOException + { + super(desc, partitioner); + this.maxDataAge = maxDataAge; + + + this.ifile = ifile; + this.dfile = dfile; + this.indexSummary = indexSummary; + this.bf = bloomFilter; + } + /** * Open a RowIndexedReader which already has its state initialized (by SSTableWriter). */ - static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge) throws IOException + static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge, EstimatedHistogram rowsize, + EstimatedHistogram columncount) throws IOException { assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null; - return new SSTableReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge); + return new SSTableReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount); } SSTableReader(Descriptor desc, @@ -210,7 +249,9 @@ public class SSTableReader extends SSTab SegmentedFile dfile, IndexSummary indexSummary, BloomFilter bloomFilter, - long maxDataAge) + long maxDataAge, + EstimatedHistogram rowsize, + EstimatedHistogram columncount) throws IOException { super(desc, partitioner); @@ -221,6 +262,8 @@ public class SSTableReader extends SSTab this.dfile = dfile; this.indexSummary = indexSummary; this.bf = bloomFilter; + estimatedRowSize = rowsize; + estimatedColumnCount = columncount; } public void setTrackedBy(SSTableTracker tracker) Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Jul 23 19:22:58 2010 @@ -26,7 +26,9 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.StatisticsTable; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.AbstractCompactedRow; import org.apache.cassandra.io.util.BufferedRandomAccessFile; @@ -34,6 +36,7 @@ import org.apache.cassandra.io.util.Segm import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.BloomFilter; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.WrappedRunnable; public class SSTableWriter extends SSTable { @@ -83,6 +86,8 @@ public class SSTableWriter extends SSTab long currentPosition = beforeAppend(row.key); FBUtilities.writeShortByteArray(row.key.key, dataFile); row.write(dataFile); + estimatedRowSize.add(dataFile.getFilePointer() - currentPosition); + estimatedColumnCount.add(row.columnCount()); afterAppend(row.key, currentPosition); } @@ -94,7 +99,7 @@ public class SSTableWriter extends SSTab long sizePosition = dataFile.getFilePointer(); dataFile.writeLong(-1); // write out row data - ColumnFamily.serializer().serializeWithIndexes(cf, dataFile); + int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, dataFile); // seek back and write the row size (not including the size Long itself) long endPosition = dataFile.getFilePointer(); dataFile.seek(sizePosition); @@ -102,6 +107,8 @@ public class SSTableWriter extends SSTab // finally, reset for next row dataFile.seek(endPosition); afterAppend(decoratedKey, startPosition); + estimatedRowSize.add(endPosition - startPosition); + estimatedColumnCount.add(columnCount); } public void append(DecoratedKey decoratedKey, byte[] value) throws IOException @@ -128,12 +135,21 @@ public class SSTableWriter extends SSTab dataFile.close(); // calls force // remove the 'tmp' marker from all components - Descriptor newdesc = rename(desc); + final Descriptor newdesc = rename(desc); + + Runnable runnable = new WrappedRunnable() + { + protected void runMayThrow() throws IOException + { + StatisticsTable.persistSSTableStatistics(newdesc, estimatedRowSize, estimatedColumnCount); + } + }; + ColumnFamilyStore.submitPostFlush(runnable); // finalize in-memory state for the reader SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX)); SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA)); - SSTableReader sstable = SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge); + SSTableReader sstable = SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount); iwriter = null; dbuilder = null; return sstable; Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java Fri Jul 23 19:22:58 2010 @@ -25,27 +25,58 @@ public class EstimatedHistogram { /** - * This series starts at 1 and grows by 1.2 each time (rounding down and removing duplicates). It goes from 1 - * to around 30M, which will give us timing resolution from microseconds to 30 seconds, with less precision - * as the numbers get larger. + * The series of values to which the counts in `buckets` correspond: + * 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 18, 22, etc. + * Thus, a `buckets` of [0, 0, 1, 10] would mean we had seen one value of 3 and 10 values of 4. + * + * The series starts at 1 and grows by 1.2 each time (rounding and removing duplicates). It goes from 1 + * to around 36M by default (creating 90+1 buckets), which will give us timing resolution from microseconds to + * 36 seconds, with less precision as the numbers get larger. */ - private static final long[] bucketOffsets = { - 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 18, 22, 26, 31, 38, 46, 55, 66, 79, 95, 114, 137, 164, 197, 237, 284, 341, 410, 492, 590, - 708, 850, 1020, 1224, 1469, 1763, 2116, 2539, 3047, 3657, 4388, 5266, 6319, 7583, 9100, 10920, 13104, 15725, 18870, 22644, - 27173, 32608, 39130, 46956, 56347, 67617, 81140, 97368, 116842, 140210, 168252, 201903, 242283, 290740, 348888, 418666, - 502400, 602880, 723456, 868147, 1041776, 1250132, 1500158, 1800190, 2160228, 2592274, 3110728, 3732874, 4479449, 5375339, - 6450407, 7740489, 9288586, 11146304, 13375565, 16050678, 19260813, 23112976, 27735572, 33282686 - }; - - private static final int numBuckets = bucketOffsets.length + 1; + private long[] bucketOffsets; + private int numBuckets; final AtomicLongArray buckets; public EstimatedHistogram() { + makeOffsets(90); + buckets = new AtomicLongArray(numBuckets); + } + + public EstimatedHistogram(int bucketCount) + { + makeOffsets(bucketCount); buckets = new AtomicLongArray(numBuckets); } + public EstimatedHistogram(long[] bucketData) + { + makeOffsets(bucketData.length - 1); + buckets = new AtomicLongArray(bucketData); + } + + private void makeOffsets(int size) + { + bucketOffsets = new long[size]; + long last = 1; + bucketOffsets[0] = last; + for(int i = 1; i < size; i++) + { + long next = Math.round(last * 1.2); + if (next == last) + next++; + bucketOffsets[i] = next; + last = next; + } + numBuckets = bucketOffsets.length + 1; + } + + public long[] getBucketOffsets() + { + return bucketOffsets; + } + public void add(long n) { int index = Arrays.binarySearch(bucketOffsets, n); @@ -74,4 +105,40 @@ public class EstimatedHistogram return rv; } + + public long min() + { + for (int i = 0; i < numBuckets; i++) + { + if (buckets.get(i) > 0) + return bucketOffsets[i == 0 ? 0 : i - 1]; + } + return 0; + } + + public long max() + { + for (int i = numBuckets - 1; i >= 0; i--) + { + if (buckets.get(i) > 0) + return bucketOffsets[i == 0 ? 0 : i - 1]; + } + return 0; + } + + public long median() + { + long max = 0; + long median = 0; + for (int i = 0; i < numBuckets; i++) + { + if (max < 1 || buckets.get(i) > max) + { + max = buckets.get(i); + if (max > 0) + median = bucketOffsets[i == 0 ? 0 : i - 1]; + } + } + return median; + } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java Fri Jul 23 19:22:58 2010 @@ -110,4 +110,22 @@ public class SSTableReaderTest extends C assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == -1; } } + + @Test + public void testPersistentStatistics() throws IOException, ExecutionException, InterruptedException + { + + Table table = Table.open("Keyspace1"); + ColumnFamilyStore store = table.getColumnFamilyStore("Standard1"); + + for (int j = 0; j < 100; j += 2) + { + byte[] key = String.valueOf(j).getBytes(); + RowMutation rm = new RowMutation("Keyspace1", key); + rm.add(new QueryPath("Standard1", null, "0".getBytes()), new byte[0], new TimestampClock(j)); + rm.apply(); + } + store.forceBlockingFlush(); + assert store.getMaxRowCompactedSize() != 0; + } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java?rev=967213&r1=967212&r2=967213&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java Fri Jul 23 19:22:58 2010 @@ -31,20 +31,23 @@ public class EstimatedHistogramTest EstimatedHistogram histogram = new EstimatedHistogram(); histogram.add(0L); - assertEquals(1, histogram.get(true)[0]); + assertEquals(1, histogram.get(false)[0]); histogram.add(33282687); - assertEquals(1, histogram.get(true)[histogram.buckets.length()-1]); + assertEquals(1, histogram.get(false)[histogram.buckets.length()-1]); histogram.add(1); - assertEquals(1, histogram.get(true)[1]); + assertEquals(1, histogram.get(false)[1]); histogram.add(9); - assertEquals(1, histogram.get(true)[8]); + assertEquals(1, histogram.get(false)[8]); - histogram.add(23); - histogram.add(24); - histogram.add(25); - assertEquals(3, histogram.get(true)[13]); + histogram.add(20); + histogram.add(21); + histogram.add(22); + assertEquals(3, histogram.get(false)[13]); + assertEquals(1, histogram.min()); + assertEquals(25109160, histogram.max()); + assertEquals(20, histogram.median()); } }
