Author: gdusbabek
Date: Thu Mar 4 20:17:27 2010
New Revision: 919157
URL: http://svn.apache.org/viewvc?rev=919157&view=rev
Log:
push cfid generation down into CFMetaData, getting rid of TableMetaData in the
process
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=919157&r1=919156&r2=919157&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
Thu Mar 4 20:17:27 2010
@@ -26,12 +26,37 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
public final class CFMetaData
{
public final static double DEFAULT_KEY_CACHE_SIZE = 0.1;
public final static double DEFAULT_ROW_CACHE_SIZE = 0.0;
+ private static final AtomicInteger idGen = new AtomicInteger(0);
+ private static final Map<Integer, String> idToName = new HashMap<Integer,
String>();
+
+ // this only gets used by a toString method.
+ public static final String getName(int id)
+ {
+ return idToName.get(id);
+ }
+
+ public static final int getCfCount()
+ {
+ return idToName.size();
+ }
+
+ // this gets called after initialization to make sure that id generation
happens properly.
+ public static final void fixMaxId()
+ {
+ int maxId = Collections.max(idToName.keySet());
+ idGen.set(maxId + 1);
+ }
+
public final String tableName; // name of table which has this
column family
public final String cfName; // name of the column family
public final String columnType; // type: super, standard, etc.
@@ -40,8 +65,9 @@
public final String comment; // for humans only
public final double rowCacheSize; // default 0
public final double keyCacheSize; // default 0.01
+ public final transient int cfId;
- public CFMetaData(String tableName, String cfName, String columnType,
AbstractType comparator, AbstractType subcolumnComparator, String comment,
double rowCacheSize, double keyCacheSize)
+ private CFMetaData(String tableName, String cfName, String columnType,
AbstractType comparator, AbstractType subcolumnComparator, String comment,
double rowCacheSize, double keyCacheSize, int cfId)
{
this.tableName = tableName;
this.cfName = cfName;
@@ -51,6 +77,13 @@
this.comment = comment;
this.rowCacheSize = rowCacheSize;
this.keyCacheSize = keyCacheSize;
+ this.cfId = cfId;
+ }
+
+ public CFMetaData(String tableName, String cfName, String columnType,
AbstractType comparator, AbstractType subcolumnComparator, String comment,
double rowCacheSize, double keyCacheSize)
+ {
+ this(tableName, cfName, columnType, comparator, subcolumnComparator,
comment, rowCacheSize, keyCacheSize, nextId());
+ idToName.put(cfId, cfName);
}
// a quick and dirty pretty printer for describing the column family...
@@ -77,6 +110,7 @@
dout.writeUTF(cfm.comment);
dout.writeDouble(cfm.rowCacheSize);
dout.writeDouble(cfm.keyCacheSize);
+ dout.writeInt(cfm.cfId);
dout.close();
return bout.toByteArray();
}
@@ -109,7 +143,8 @@
String comment = din.readBoolean() ? din.readUTF() : null;
double rowCacheSize = din.readDouble();
double keyCacheSize = din.readDouble();
- return new CFMetaData(tableName, cfName, columnType, comparator,
subcolumnComparator, comment, rowCacheSize, keyCacheSize);
+ int cfId = din.readInt();
+ return new CFMetaData(tableName, cfName, columnType, comparator,
subcolumnComparator, comment, rowCacheSize, keyCacheSize, cfId);
}
public boolean equals(Object obj)
@@ -124,6 +159,12 @@
&& FBUtilities.equals(other.subcolumnComparator,
subcolumnComparator)
&& FBUtilities.equals(other.comment, comment)
&& other.rowCacheSize == rowCacheSize
- && other.keyCacheSize == keyCacheSize;
+ && other.keyCacheSize == keyCacheSize
+ && other.cfId == cfId;
+ }
+
+ private static int nextId()
+ {
+ return idGen.getAndIncrement();
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=919157&r1=919156&r2=919157&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Thu Mar 4 20:17:27 2010
@@ -818,34 +818,6 @@
}
}
- /**
- * Create the metadata tables. This table has information about
- * the table name and the column families that make up the table.
- * Each column family also has an associated ID which is an int.
- */
- // TODO duplicating data b/t tablemetadata and CFMetaData is confusing and
error-prone
- public static void storeMetadata() throws IOException
- {
- int cfId = 0;
- Set<String> tableset = tables.keySet();
-
- for (String table : tableset)
- {
- Table.TableMetadata tmetadata =
Table.TableMetadata.instance(table);
- if (tmetadata.isEmpty())
- {
- tmetadata = Table.TableMetadata.instance(table);
- /* Column families associated with this table */
- Map<String, CFMetaData> columnFamilies =
tables.get(table).cfMetaData();
-
- for (String columnFamily : columnFamilies.keySet())
- {
- tmetadata.add(columnFamily, cfId++,
DatabaseDescriptor.getColumnType(table, columnFamily));
- }
- }
- }
- }
-
public static int getGcGraceInSeconds()
{
return gcGraceInSeconds;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=919157&r1=919156&r2=919157&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu
Mar 4 20:17:27 2010
@@ -71,115 +71,11 @@
}
}
- /*
- * This class represents the metadata of this Table. The metadata
- * is basically the column family name and the ID associated with
- * this column family. We use this ID in the Commit Log header to
- * determine when a log file that has been rolled can be deleted.
- */
- public static class TableMetadata
- {
- private static HashMap<String,TableMetadata> tableMetadataMap = new
HashMap<String,TableMetadata>();
- private static Map<Integer, String> idCfMap_ = new HashMap<Integer,
String>();
-
- static
- {
- try
- {
- DatabaseDescriptor.storeMetadata();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public static synchronized Table.TableMetadata instance(String
tableName) throws IOException
- {
- if ( tableMetadataMap.get(tableName) == null )
- {
- tableMetadataMap.put(tableName, new Table.TableMetadata());
- }
- return tableMetadataMap.get(tableName);
- }
-
- /* The mapping between column family and the column type. */
- private Map<String, String> cfTypeMap_ = new HashMap<String, String>();
- private Map<String, Integer> cfIdMap_ = new HashMap<String, Integer>();
-
- public void add(String cf, int id)
- {
- add(cf, id, "Standard");
- }
-
- public void add(String cf, int id, String type)
- {
- if (logger.isDebugEnabled())
- logger.debug("adding " + cf + " as " + id);
- assert !idCfMap_.containsKey(id);
- cfIdMap_.put(cf, id);
- idCfMap_.put(id, cf);
- cfTypeMap_.put(cf, type);
- }
-
- public boolean isEmpty()
- {
- return cfIdMap_.isEmpty();
- }
-
- int getColumnFamilyId(String columnFamily)
- {
- return cfIdMap_.get(columnFamily);
- }
-
- public static String getColumnFamilyName(int id)
- {
- return idCfMap_.get(id);
- }
-
- String getColumnFamilyType(String cfName)
- {
- return cfTypeMap_.get(cfName);
- }
-
- Set<String> getColumnFamilies()
- {
- return cfIdMap_.keySet();
- }
-
- int size()
- {
- return cfIdMap_.size();
- }
-
- boolean isValidColumnFamily(String cfName)
- {
- return cfIdMap_.containsKey(cfName);
- }
-
- public String toString()
- {
- return "TableMetadata(" + FBUtilities.mapToString(cfIdMap_) + ")";
- }
-
- public static int getColumnFamilyCount()
- {
- return idCfMap_.size();
- }
-
- public static String getColumnFamilyIDString()
- {
- return FBUtilities.mapToString(tableMetadataMap);
- }
- }
-
/** Table objects, one per keyspace. only one instance should ever exist
for any given keyspace. */
private static final Map<String, Table> instances = new
NonBlockingHashMap<String, Table>();
/* Table name. */
public final String name;
- /* Handle to the Table Metadata */
- private final Table.TableMetadata tableMetadata;
/* ColumnFamilyStore per column family */
private final Map<String, ColumnFamilyStore> columnFamilyStores = new
HashMap<String, ColumnFamilyStore>();
// cache application CFs since Range queries ask for them a _lot_
@@ -207,7 +103,7 @@
public Set<String> getColumnFamilies()
{
- return tableMetadata.getColumnFamilies();
+ return
DatabaseDescriptor.getTableDefinition(name).cfMetaData().keySet();
}
public Collection<ColumnFamilyStore> getColumnFamilyStores()
@@ -228,7 +124,7 @@
if (name.equals(SYSTEM_TABLE))
throw new RuntimeException("Cleanup of the system table is neither
necessary nor wise");
- Set<String> columnFamilies = tableMetadata.getColumnFamilies();
+ Set<String> columnFamilies = getColumnFamilies();
for ( String columnFamily : columnFamilies )
{
ColumnFamilyStore cfStore = columnFamilyStores.get( columnFamily );
@@ -285,7 +181,7 @@
public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
{
List<SSTableReader> allResults = new ArrayList<SSTableReader>();
- Set<String> columnFamilies = tableMetadata.getColumnFamilies();
+ Set<String> columnFamilies = getColumnFamilies();
for ( String columnFamily : columnFamilies )
{
ColumnFamilyStore cfStore = columnFamilyStores.get( columnFamily );
@@ -307,7 +203,7 @@
*/
public void forceCompaction()
{
- Set<String> columnFamilies = tableMetadata.getColumnFamilies();
+ Set<String> columnFamilies = getColumnFamilies();
for ( String columnFamily : columnFamilies )
{
ColumnFamilyStore cfStore = columnFamilyStores.get( columnFamily );
@@ -319,7 +215,7 @@
List<SSTableReader> getAllSSTablesOnDisk()
{
List<SSTableReader> list = new ArrayList<SSTableReader>();
- Set<String> columnFamilies = tableMetadata.getColumnFamilies();
+ Set<String> columnFamilies = getColumnFamilies();
for ( String columnFamily : columnFamilies )
{
ColumnFamilyStore cfStore = columnFamilyStores.get( columnFamily );
@@ -333,8 +229,7 @@
{
name = table;
waitForCommitLog = DatabaseDescriptor.getCommitLogSync() ==
DatabaseDescriptor.CommitLogSync.batch;
- tableMetadata = Table.TableMetadata.instance(table);
- for (String columnFamily : tableMetadata.getColumnFamilies())
+ for (String columnFamily : getColumnFamilies())
{
columnFamilyStores.put(columnFamily,
ColumnFamilyStore.createColumnFamilyStore(table, columnFamily));
}
@@ -362,7 +257,7 @@
public int getColumnFamilyId(String columnFamily)
{
- return tableMetadata.getColumnFamilyId(columnFamily);
+ return
DatabaseDescriptor.getTableDefinition(name).cfMetaData().get(columnFamily).cfId;
}
/**
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=919157&r1=919156&r2=919157&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Thu Mar 4 20:17:27 2010
@@ -72,7 +72,7 @@
{
private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after
log gets this big
- private static final Logger logger = Logger.getLogger(CommitLog.class);
+ static final Logger logger = Logger.getLogger(CommitLog.class);
public static CommitLog instance()
{
@@ -108,8 +108,7 @@
{
// all old segments are recovered and deleted before CommitLog is
instantiated.
// All we need to do is create a new one.
- int cfSize = Table.TableMetadata.getColumnFamilyCount();
- segments.add(new CommitLogSegment(cfSize));
+ segments.add(new CommitLogSegment());
if (DatabaseDescriptor.getCommitLogSync() ==
DatabaseDescriptor.CommitLogSync.periodic)
{
@@ -368,7 +367,7 @@
private void
discardCompletedSegmentsInternal(CommitLogSegment.CommitLogContext context, int
id) throws IOException
{
if (logger.isDebugEnabled())
- logger.debug("discard completed log segments for " + context + ",
column family " + id + ". CFIDs are " +
Table.TableMetadata.getColumnFamilyIDString());
+ logger.debug("discard completed log segments for " + context + ",
column family " + id + ".");
/*
* log replay assumes that we only have to look at entries past the
last
@@ -441,7 +440,7 @@
if (currentSegment().length() >= SEGMENT_SIZE)
{
sync();
- segments.add(new
CommitLogSegment(currentSegment().getHeader().getColumnFamilyCount()));
+ segments.add(new CommitLogSegment());
}
return context;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=919157&r1=919156&r2=919157&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
Thu Mar 4 20:17:27 2010
@@ -19,46 +19,41 @@
package org.apache.cassandra.db.commitlog;
import java.io.*;
-import java.util.BitSet;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.utils.BitSetSerializer;
class CommitLogHeader
-{
+{
private static CommitLogHeaderSerializer serializer = new
CommitLogHeaderSerializer();
- static CommitLogHeaderSerializer serializer()
+ static int getLowestPosition(CommitLogHeader clheader)
{
- return serializer;
- }
-
- static int getLowestPosition(CommitLogHeader clHeader)
- {
- int minPosition = Integer.MAX_VALUE;
- for ( int position : clHeader.lastFlushedAt)
- {
- if ( position < minPosition && position > 0)
+ return clheader.lastFlushedAt.size() == 0 ? 0 :
Collections.min(clheader.lastFlushedAt.values(), new Comparator<Integer>(){
+ public int compare(Integer o1, Integer o2)
{
- minPosition = position;
+ if (o1 == 0)
+ return 1;
+ else if (o2 == 0)
+ return -1;
+ else
+ return o1 - o2;
}
- }
-
- if(minPosition == Integer.MAX_VALUE)
- minPosition = 0;
- return minPosition;
+ });
}
- private BitSet dirty; // columnfamilies with un-flushed data in this
CommitLog
- private int[] lastFlushedAt; // position at which each CF was last flushed
+ private Map<Integer, Integer> lastFlushedAt; // position at which each CF
was last flushed
+ private final int maxSerializedSize;
- CommitLogHeader(int size)
+ CommitLogHeader()
{
- dirty = new BitSet(size);
- lastFlushedAt = new int[size];
+ lastFlushedAt = new HashMap<Integer, Integer>();
+ maxSerializedSize = 8 * CFMetaData.getCfCount();
}
/*
@@ -66,62 +61,58 @@
* also builds an index of position to column family
* Id.
*/
- CommitLogHeader(BitSet dirty, int[] lastFlushedAt)
+ private CommitLogHeader(Map<Integer, Integer> lastFlushedAt)
{
- this.dirty = dirty;
+ assert lastFlushedAt.size() <= CFMetaData.getCfCount();
this.lastFlushedAt = lastFlushedAt;
+ maxSerializedSize = 8 * CFMetaData.getCfCount();
}
- boolean isDirty(int index)
+ boolean isDirty(int cfId)
{
- return dirty.get(index);
+ return lastFlushedAt.containsKey(cfId);
}
int getPosition(int index)
{
- return lastFlushedAt[index];
+ Integer x = lastFlushedAt.get(index);
+ return x == null ? 0 : x;
}
- void turnOn(int index, long position)
+ void turnOn(int cfId, long position)
{
- dirty.set(index);
- lastFlushedAt[index] = (int) position;
+ lastFlushedAt.put(cfId, (int)position);
}
- void turnOff(int index)
+ void turnOff(int cfId)
{
- dirty.set(index, false);
- lastFlushedAt[index] = 0;
+ lastFlushedAt.remove(cfId);
}
boolean isSafeToDelete() throws IOException
{
- return dirty.isEmpty();
+ return lastFlushedAt.isEmpty();
}
byte[] toByteArray() throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ByteArrayOutputStream bos = new
ByteArrayOutputStream(maxSerializedSize);
DataOutputStream dos = new DataOutputStream(bos);
- CommitLogHeader.serializer().serialize(this, dos);
- return bos.toByteArray();
+ serializer.serialize(this, dos);
+ byte[] src = bos.toByteArray();
+ assert src.length < maxSerializedSize;
+ byte[] dst = new byte[maxSerializedSize];
+ System.arraycopy(src, 0, dst, 0, src.length);
+ return dst;
}
public String toString()
{
StringBuilder sb = new StringBuilder("");
- sb.append("CLH(dirty={");
- for ( int i = 0; i < dirty.size(); ++i )
- {
- if (dirty.get(i))
- {
-
sb.append(Table.TableMetadata.getColumnFamilyName(i)).append(", ");
- }
- }
- sb.append("}, flushed={");
- for (int i = 0; i < lastFlushedAt.length; i++)
+ sb.append("CLH(dirty+flushed={");
+ for (Map.Entry<Integer, Integer> entry : lastFlushedAt.entrySet())
{
- sb.append(Table.TableMetadata.getColumnFamilyName(i)).append(":
").append(lastFlushedAt[i]).append(", ");
+ sb.append(CFMetaData.getName(entry.getKey())).append(":
").append(entry.getValue()).append(", ");
}
sb.append("})");
return sb.toString();
@@ -130,51 +121,39 @@
public String dirtyString()
{
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < dirty.length(); i++)
- {
- if (dirty.get(i))
- {
- sb.append(i).append(", ");
- }
- }
+ for (Map.Entry<Integer, Integer> entry : lastFlushedAt.entrySet())
+ sb.append(entry.getKey()).append(", ");
return sb.toString();
}
static CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile
logReader) throws IOException
{
- int size = (int)logReader.readLong();
- byte[] bytes = new byte[size];
- logReader.read(bytes);
+ int statedSize = logReader.readInt();
+ byte[] bytes = new byte[statedSize];
+ logReader.readFully(bytes);
ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
- return serializer().deserialize(new DataInputStream(byteStream));
- }
-
- public int getColumnFamilyCount()
- {
- return lastFlushedAt.length;
+ return serializer.deserialize(new DataInputStream(byteStream));
}
static class CommitLogHeaderSerializer implements
ICompactSerializer<CommitLogHeader>
{
public void serialize(CommitLogHeader clHeader, DataOutputStream dos)
throws IOException
{
- BitSetSerializer.serialize(clHeader.dirty, dos);
- dos.writeInt(clHeader.lastFlushedAt.length);
- for (int position : clHeader.lastFlushedAt)
+ dos.writeInt(clHeader.lastFlushedAt.size());
+ for (Map.Entry<Integer, Integer> entry :
clHeader.lastFlushedAt.entrySet())
{
- dos.writeInt(position);
+ dos.writeInt(entry.getKey());
+ dos.writeInt(entry.getValue());
}
}
public CommitLogHeader deserialize(DataInputStream dis) throws
IOException
{
- BitSet bitFlags = BitSetSerializer.deserialize(dis);
- int[] position = new int[dis.readInt()];
- for (int i = 0; i < position.length; ++i)
- {
- position[i] = dis.readInt();
- }
- return new CommitLogHeader(bitFlags, position);
+ int lfSz = dis.readInt();
+ Map<Integer, Integer> lastFlushedAt = new HashMap<Integer,
Integer>();
+ for (int i = 0; i < lfSz; i++)
+ lastFlushedAt.put(dis.readInt(), dis.readInt());
+ return new CommitLogHeader(lastFlushedAt);
}
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=919157&r1=919156&r2=919157&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Thu Mar 4 20:17:27 2010
@@ -43,9 +43,9 @@
private final BufferedRandomAccessFile logWriter;
private final CommitLogHeader header;
- public CommitLogSegment(int cfCount)
+ public CommitLogSegment()
{
- this.header = new CommitLogHeader(cfCount);
+ this.header = new CommitLogHeader();
String logFile = DatabaseDescriptor.getLogFileLocation() +
File.separator + "CommitLog-" + System.currentTimeMillis() + ".log";
logger.info("Creating new commitlog segment " + logFile);
@@ -78,7 +78,7 @@
private void writeCommitLogHeader(byte[] bytes) throws IOException
{
- logWriter.writeLong(bytes.length);
+ logWriter.writeInt(bytes.length);
logWriter.write(bytes);
logWriter.sync();
}