Author: gdusbabek
Date: Wed Jul 21 15:14:25 2010
New Revision: 966277

URL: http://svn.apache.org/viewvc?rev=966277&view=rev
Log:
use avro serialization for KSM, CFS and parts of Migrations. patch by stuhood, 
reviewed by gdusbabek. CASSANDRA-1186

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
    
cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
    
cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
    
cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed 
Jul 21 15:14:25 2010
@@ -22,20 +22,25 @@ import java.io.*;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.*;
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.ClockType;
 import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.clock.TimestampReconciler;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.locator.DatacenterShardStrategy;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -48,8 +53,6 @@ public final class CFMetaData
     public final static boolean DEFAULT_PRELOAD_ROW_CACHE = false;
     private static final int MIN_CF_ID = 1000;
 
-    private static final Logger logger = 
LoggerFactory.getLogger(DatacenterShardStrategy.class);
-
     private static final AtomicInteger idGen = new AtomicInteger(MIN_CF_ID);
     
     private static final Map<Integer, String> currentCfNames = new 
HashMap<Integer, String>();
@@ -114,7 +117,6 @@ public final class CFMetaData
     public final Integer cfId;
     public boolean preloadRowCache;
 
-    // BytesToken because byte[].hashCode|equals is inherited from Object.  
gggrrr...
     public final Map<byte[], ColumnDefinition> column_metadata;
 
     private CFMetaData(String tableName,
@@ -142,7 +144,7 @@ public final class CFMetaData
         // cfType == Super, subcolumnComparator should default to BytesType if 
not set.
         this.subcolumnComparator = subcolumnComparator == null && cfType == 
ColumnFamilyType.Super ? BytesType.instance : subcolumnComparator;
         this.reconciler = reconciler;
-        this.comment = comment;
+        this.comment = comment == null ? "" : comment;
         this.rowCacheSize = rowCacheSize;
         this.preloadRowCache = preloadRowCache;
         this.keyCacheSize = keyCacheSize;
@@ -198,74 +200,53 @@ public final class CFMetaData
                + "Columns Sorted By: " + comparator + "\n";
     }
 
-    public static byte[] serialize(CFMetaData cfm) throws IOException
+    public org.apache.cassandra.avro.CfDef deflate()
     {
-        ByteArrayOutputStream bout = new ByteArrayOutputStream();
-        DataOutputStream dout = new DataOutputStream(bout);
-        dout.writeUTF(cfm.tableName);
-        dout.writeUTF(cfm.cfName);
-        dout.writeUTF(cfm.cfType.name());
-        dout.writeUTF(cfm.clockType.name());
-        dout.writeUTF(cfm.comparator.getClass().getName());
-        dout.writeBoolean(cfm.subcolumnComparator != null);
-        if (cfm.subcolumnComparator != null)
-            dout.writeUTF(cfm.subcolumnComparator.getClass().getName());
-        dout.writeUTF(cfm.reconciler.getClass().getName());
-        dout.writeBoolean(cfm.comment != null);
-        if (cfm.comment != null)
-            dout.writeUTF(cfm.comment);
-        dout.writeDouble(cfm.rowCacheSize);
-        dout.writeBoolean(cfm.preloadRowCache);
-        dout.writeDouble(cfm.keyCacheSize);
-        dout.writeDouble(cfm.readRepairChance);
-        dout.writeInt(cfm.cfId);
-        dout.writeInt(cfm.column_metadata.size());
-        for (ColumnDefinition cd : cfm.column_metadata.values())
-        {
-            byte[] cdBytes = ColumnDefinition.serialize(cd);
-            dout.writeInt(cdBytes.length);
-            dout.write(cdBytes);
-        }
-        dout.close();
-        return bout.toByteArray();
+        org.apache.cassandra.avro.CfDef cf = new 
org.apache.cassandra.avro.CfDef();
+        cf.id = cfId;
+        cf.keyspace = new Utf8(tableName);
+        cf.name = new Utf8(cfName);
+        cf.column_type = new Utf8(cfType.name());
+        cf.clock_type = new Utf8(clockType.name());
+        cf.comparator_type = new Utf8(comparator.getClass().getName());
+        if (subcolumnComparator != null)
+            cf.subcomparator_type = new 
Utf8(subcolumnComparator.getClass().getName());
+        cf.reconciler = new Utf8(reconciler.getClass().getName());
+        cf.comment = new Utf8(comment);
+        cf.row_cache_size = rowCacheSize;
+        cf.key_cache_size = keyCacheSize;
+        cf.preload_row_cache = preloadRowCache;
+        cf.read_repair_chance = readRepairChance;
+        cf.column_metadata = SerDeUtils.createArray(column_metadata.size(),
+                                                    
org.apache.cassandra.avro.ColumnDef.SCHEMA$);
+        for (ColumnDefinition cd : column_metadata.values())
+            cf.column_metadata.add(cd.deflate());
+        return cf;
     }
 
-    public static CFMetaData deserialize(InputStream in) throws IOException, 
ConfigurationException
+    public static CFMetaData inflate(org.apache.cassandra.avro.CfDef cf) 
throws ConfigurationException
     {
-        DataInputStream din = new DataInputStream(in);
-        String tableName = din.readUTF();
-        String cfName = din.readUTF();
-        ColumnFamilyType cfType = ColumnFamilyType.create(din.readUTF());
-        ClockType clockType = ClockType.create(din.readUTF());
-        AbstractType comparator = 
DatabaseDescriptor.getComparator(din.readUTF());
+        AbstractType comparator = 
DatabaseDescriptor.getComparator(cf.comparator_type.toString());
         AbstractType subcolumnComparator = null;
-        subcolumnComparator = din.readBoolean() ? 
DatabaseDescriptor.getComparator(din.readUTF()) : null;
+        if (cf.subcomparator_type != null)
+            subcolumnComparator = 
DatabaseDescriptor.getComparator(cf.subcomparator_type.toString());
         AbstractReconciler reconciler = null;
         try
         {
-            reconciler = 
(AbstractReconciler)Class.forName(din.readUTF()).newInstance();
+            reconciler = 
(AbstractReconciler)Class.forName(cf.reconciler.toString()).newInstance();
         }
         catch (Exception ex)
         {
-            throw new IOException(ex);
+            throw new ConfigurationException("Could not create Reconciler of 
type " + cf.reconciler, ex);
         }
-        String comment = din.readBoolean() ? din.readUTF() : null;
-        double rowCacheSize = din.readDouble();
-        boolean preloadRowCache = din.readBoolean();
-        double keyCacheSize = din.readDouble();
-        double readRepairChance = din.readDouble();
-        int cfId = din.readInt();
-        int columnMetadataEntries = din.readInt();
         Map<byte[], ColumnDefinition> column_metadata = new TreeMap<byte[], 
ColumnDefinition>(FBUtilities.byteArrayComparator);
-        for (int i = 0; i < columnMetadataEntries; i++)
+        Iterator<org.apache.cassandra.avro.ColumnDef> cditer = 
cf.column_metadata.iterator();
+        while (cditer.hasNext())
         {
-            int cdSize = din.readInt();
-            byte[] cdBytes = new byte[cdSize];
-            din.readFully(cdBytes);
-            ColumnDefinition cd = ColumnDefinition.deserialize(cdBytes);
+            ColumnDefinition cd = ColumnDefinition.inflate(cditer.next());
             column_metadata.put(cd.name, cd);
         }
-        return new CFMetaData(tableName, cfName, cfType, clockType, 
comparator, subcolumnComparator, reconciler, comment, rowCacheSize, 
preloadRowCache, keyCacheSize, readRepairChance, cfId, column_metadata);
+        return new CFMetaData(cf.keyspace.toString(), cf.name.toString(), 
ColumnFamilyType.create(cf.column_type.toString()), 
ClockType.create(cf.clock_type.toString()), comparator, subcolumnComparator, 
reconciler, cf.comment.toString(), cf.row_cache_size, cf.preload_row_cache, 
cf.key_cache_size, cf.read_repair_chance, cf.id, column_metadata);
     }
 
     public boolean equals(Object obj) 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java 
Wed Jul 21 15:14:25 2010
@@ -1,8 +1,9 @@
 package org.apache.cassandra.config;
 
-import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.avro.util.Utf8;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
@@ -53,45 +54,27 @@ public class ColumnDefinition {
         return result;
     }
 
-    public static byte[] serialize(ColumnDefinition cd) throws IOException
+    public org.apache.cassandra.avro.ColumnDef deflate()
     {
-        ByteArrayOutputStream bout = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(bout);
-        out.writeInt(cd.name.length);
-        out.write(cd.name);
-        out.writeUTF(cd.validator.getClass().getName());
-
-        out.writeBoolean(cd.index_type != null);
-        if (cd.index_type != null)
-            out.writeInt(cd.index_type.ordinal());
-
-        out.writeBoolean(cd.index_name != null);
-        if (cd.index_name != null)
-            out.writeUTF(cd.index_name);
-
-        out.close();
-        return bout.toByteArray();
-    }
-
-    public static ColumnDefinition deserialize(byte[] bytes) throws IOException
-    {
-        DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(bytes));
-        int nameSize = in.readInt();
-        byte[] name = new byte[nameSize];
-        in.readFully(name);
-        String validation_class = in.readUTF();
-
-        IndexType index_type = null;
-        if (in.readBoolean())
-            index_type = IndexType.values()[in.readInt()];
-
-        String index_name = null;
-        if (in.readBoolean())
-            index_name = in.readUTF();
-
+        org.apache.cassandra.avro.ColumnDef cd = new 
org.apache.cassandra.avro.ColumnDef();
+        cd.name = ByteBuffer.wrap(name);
+        cd.validation_class = new Utf8(validator.getClass().getName());
+        cd.index_type = index_type == null ? null :
+            Enum.valueOf(org.apache.cassandra.avro.IndexType.class, 
index_type.name());
+        cd.index_name = index_name == null ? null : new Utf8(index_name);
+        return cd;
+    }
+
+    public static ColumnDefinition inflate(org.apache.cassandra.avro.ColumnDef 
cd) throws ConfigurationException
+    {
+        byte[] name = new byte[cd.name.remaining()];
+        cd.name.get(name, 0, name.length);
+        IndexType index_type = cd.index_type == null ? null :
+            Enum.valueOf(IndexType.class, cd.index_type.name());
+        String index_name = cd.index_name == null ? null : 
cd.index_name.toString();
         try
         {
-            return new ColumnDefinition(name, validation_class, index_type, 
index_name);
+            return new ColumnDefinition(name, cd.validation_class.toString(), 
index_type, index_name);
         }
         catch (ConfigurationException e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java
 Wed Jul 21 15:14:25 2010
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.config;
 
-import java.io.IOException;
-
 public class ConfigurationException extends Exception
 {
     public ConfigurationException(String message)
@@ -27,8 +25,8 @@ public class ConfigurationException exte
         super(message);
     }
 
-    public ConfigurationException(String message, IOException ioe)
+    public ConfigurationException(String message, Exception e)
     {
-        super(message, ioe);
+        super(message, e);
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Wed Jul 21 15:14:25 2010
@@ -62,8 +62,6 @@ public class DatabaseDescriptor
 {
     private static Logger logger = 
LoggerFactory.getLogger(DatabaseDescriptor.class);
 
-    public static final String random = "RANDOM";
-    public static final String ophf = "OPHF";
     private static IEndpointSnitch snitch;
     private static InetAddress listenAddress; // leave null so we can fall 
through to getLocalHost
     private static InetAddress rpcAddress;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java Wed 
Jul 21 15:14:25 2010
@@ -19,18 +19,17 @@
 package org.apache.cassandra.config;
 
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.RackUnawareStrategy;
+import org.apache.cassandra.io.SerDeUtils;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.avro.util.Utf8;
 import org.apache.commons.lang.ObjectUtils;
 
 public final class KSMetaData
@@ -43,7 +42,7 @@ public final class KSMetaData
     public KSMetaData(String name, Class<? extends 
AbstractReplicationStrategy> strategyClass, int replicationFactor, 
CFMetaData... cfDefs)
     {
         this.name = name;
-        this.strategyClass = strategyClass;
+        this.strategyClass = strategyClass == null ? RackUnawareStrategy.class 
: strategyClass;
         this.replicationFactor = replicationFactor;
         Map<String, CFMetaData> cfmap = new HashMap<String, CFMetaData>();
         for (CFMetaData cfm : cfDefs)
@@ -70,50 +69,35 @@ public final class KSMetaData
         return cfMetaData;
     }
         
-    public static byte[] serialize(KSMetaData ksm) throws IOException
+    public org.apache.cassandra.avro.KsDef deflate()
     {
-        ByteArrayOutputStream bout = new ByteArrayOutputStream();
-        DataOutputStream dout = new DataOutputStream(bout);
-        dout.writeUTF(ksm.name);
-        dout.writeBoolean(ksm.strategyClass != null);
-        if (ksm.strategyClass != null)
-            dout.writeUTF(ksm.strategyClass.getName());
-        dout.writeInt(ksm.replicationFactor);
-        dout.writeInt(ksm.cfMetaData.size());
-        for (CFMetaData cfm : ksm.cfMetaData.values())
-            dout.write(CFMetaData.serialize(cfm));
-        dout.close();
-        return bout.toByteArray();
+        org.apache.cassandra.avro.KsDef ks = new 
org.apache.cassandra.avro.KsDef();
+        ks.name = new Utf8(name);
+        ks.strategy_class = new Utf8(strategyClass.getName());
+        ks.replication_factor = replicationFactor;
+        ks.cf_defs = SerDeUtils.createArray(cfMetaData.size(), 
org.apache.cassandra.avro.CfDef.SCHEMA$);
+        for (CFMetaData cfm : cfMetaData.values())
+            ks.cf_defs.add(cfm.deflate());
+        return ks;
     }
 
-    public static KSMetaData deserialize(InputStream in) throws IOException
+    public static KSMetaData inflate(org.apache.cassandra.avro.KsDef ks) 
throws ConfigurationException
     {
-        DataInputStream din = new DataInputStream(in);
-        String name = din.readUTF();
         Class<AbstractReplicationStrategy> repStratClass = null;
         try
         {
-            repStratClass = din.readBoolean() ? 
(Class<AbstractReplicationStrategy>)Class.forName(din.readUTF()) : null;
+            repStratClass = 
(Class<AbstractReplicationStrategy>)Class.forName(ks.strategy_class.toString());
         }
         catch (Exception ex)
         {
-            throw new IOException(ex);
+            throw new ConfigurationException("Could not create 
ReplicationStrategy of type " + ks.strategy_class, ex);
         }
-        int replicationFactor = din.readInt();
-        int cfsz = din.readInt();
+        int cfsz = (int)ks.cf_defs.size();
         CFMetaData[] cfMetaData = new CFMetaData[cfsz];
+        Iterator<org.apache.cassandra.avro.CfDef> cfiter = 
ks.cf_defs.iterator();
         for (int i = 0; i < cfsz; i++)
-        {
-            try
-            {
-                cfMetaData[i] = CFMetaData.deserialize(din);
-            }
-            catch (ConfigurationException e)
-            {
-                throw new IOException(e);
-            }
-        }
+            cfMetaData[i] = CFMetaData.inflate(cfiter.next());
 
-        return new KSMetaData(name, repStratClass, replicationFactor, 
cfMetaData);
+        return new KSMetaData(ks.name.toString(), repStratClass, 
ks.replication_factor, cfMetaData);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java Wed Jul 21 
15:14:25 2010
@@ -18,6 +18,9 @@
 
 package org.apache.cassandra.db;
 
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
@@ -26,12 +29,15 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
 import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
@@ -41,44 +47,77 @@ import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 public class DefsTable
 {
+    // column name for the schema storing serialized keyspace definitions
+    // NB: must be an invalid keyspace name
+    public static final byte[] DEFINITION_SCHEMA_COLUMN_NAME = 
"Avro/Schema".getBytes(UTF_8);
+
     /** dumps current keyspace definitions to storage */
     public static synchronized void dumpToStorage(UUID version) throws 
IOException
     {
-        byte[] versionKey = Migration.toBytes(version);
-        long now = System.currentTimeMillis();
+        final byte[] versionKey = Migration.toUTF8Bytes(version);
+
+        // build a list of keyspaces
+        Collection<String> ksnames = DatabaseDescriptor.getNonSystemTables();
+
+        // persist keyspaces under new version
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, versionKey);
-        for (String tableName : DatabaseDescriptor.getNonSystemTables())
+        TimestampClock now = new TimestampClock(System.currentTimeMillis());
+        for (String ksname : ksnames)
         {
-            KSMetaData ks = DatabaseDescriptor.getTableDefinition(tableName);
-            rm.add(new QueryPath(Migration.SCHEMA_CF, null, 
ks.name.getBytes()), KSMetaData.serialize(ks), new TimestampClock(now));
+            KSMetaData ksm = DatabaseDescriptor.getTableDefinition(ksname);
+            rm.add(new QueryPath(Migration.SCHEMA_CF, null, 
ksm.name.getBytes(UTF_8)), SerDeUtils.serialize(ksm.deflate()), now);
         }
+        // add the schema
+        rm.add(new QueryPath(Migration.SCHEMA_CF,
+                             null,
+                             DEFINITION_SCHEMA_COLUMN_NAME),
+                             
org.apache.cassandra.avro.KsDef.SCHEMA$.toString().getBytes(UTF_8),
+                             now);
         rm.apply();
-        
+
+        // apply new version
         rm = new RowMutation(Table.SYSTEM_TABLE, Migration.LAST_MIGRATION_KEY);
-        rm.add(new QueryPath(Migration.SCHEMA_CF, null, 
Migration.LAST_MIGRATION_KEY), UUIDGen.decompose(version), new 
TimestampClock(now));
+        rm.add(new QueryPath(Migration.SCHEMA_CF, null, 
Migration.LAST_MIGRATION_KEY),
+               UUIDGen.decompose(version),
+               now);
         rm.apply();
     }
 
     /** loads a version of keyspace definitions from storage */
     public static synchronized Collection<KSMetaData> loadFromStorage(UUID 
version) throws IOException
     {
-        DecoratedKey vkey = 
StorageService.getPartitioner().decorateKey(Migration.toBytes(version));
+        DecoratedKey vkey = 
StorageService.getPartitioner().decorateKey(Migration.toUTF8Bytes(version));
         Table defs = Table.open(Table.SYSTEM_TABLE);
         ColumnFamilyStore cfStore = 
defs.getColumnFamilyStore(Migration.SCHEMA_CF);
-        QueryFilter filter = QueryFilter.getSliceFilter(vkey, new 
QueryPath(Migration.SCHEMA_CF), "".getBytes(), "".getBytes(), null, false, 
1024);
+        QueryFilter filter = QueryFilter.getIdentityFilter(vkey, new 
QueryPath(Migration.SCHEMA_CF));
         ColumnFamily cf = cfStore.getColumnFamily(filter);
-        Collection<KSMetaData> tables = new ArrayList<KSMetaData>();
-        for (IColumn col : cf.getSortedColumns())
+        IColumn avroschema = cf.getColumn(DEFINITION_SCHEMA_COLUMN_NAME);
+        if (avroschema == null)
+            // TODO: more polite way to handle this?
+            throw new RuntimeException("Cannot read system table! Are you 
upgrading a pre-release version?");
+        Schema schema = Schema.parse(new String(avroschema.value()));
+
+        // deserialize keyspaces using schema
+        Collection<KSMetaData> keyspaces = new ArrayList<KSMetaData>();
+        try
+        {
+            for (IColumn column : cf.getSortedColumns())
+            {
+                if (Arrays.equals(column.name(), 
DEFINITION_SCHEMA_COLUMN_NAME))
+                    continue;
+                org.apache.cassandra.avro.KsDef ks = 
SerDeUtils.<org.apache.cassandra.avro.KsDef>deserialize(schema, column.value());
+                keyspaces.add(KSMetaData.inflate(ks));
+            }
+        }
+        catch (ConfigurationException e)
         {
-            //  don't allow deleted columns.
-            if (col instanceof DeletedColumn)
-                continue;
-            KSMetaData ks = KSMetaData.deserialize(new 
ByteArrayInputStream(col.value()));
-            tables.add(ks);
+            throw new IOException(e);
         }
-        return tables;
+        return keyspaces;
     }
     
     /** gets all the files that belong to a given column family. */

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java 
Wed Jul 21 15:14:25 2010
@@ -1,5 +1,8 @@
 package org.apache.cassandra.db.migration;
 
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -8,6 +11,7 @@ import org.apache.cassandra.db.RowMutati
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.SerDeUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -45,9 +49,11 @@ public class AddColumnFamily extends Mig
     {
         super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
         rm = RowMutation.serializer().deserialize(din);
+
+        // deserialize cf
         try
         {
-            cfm = CFMetaData.deserialize(din);
+            cfm = 
CFMetaData.inflate(SerDeUtils.<org.apache.cassandra.avro.CfDef>deserializeWithSchema(FBUtilities.readShortByteArray(din)));
         }
         catch (ConfigurationException e)
         {
@@ -115,7 +121,9 @@ public class AddColumnFamily extends Mig
             dos.write(UUIDGen.decompose(addColumnFamily.newVersion));
             dos.write(UUIDGen.decompose(addColumnFamily.lastVersion));
             RowMutation.serializer().serialize(addColumnFamily.rm, dos);
-            dos.write(CFMetaData.serialize(addColumnFamily.cfm));
+            // serialize the added cf
+            // TODO: sloppy, but migrations should be converted to Avro soon 
anyway
+            
FBUtilities.writeShortByteArray(SerDeUtils.serializeWithSchema(addColumnFamily.cfm.deflate()),
 dos);
         }
 
         public AddColumnFamily deserialize(DataInputStream dis) throws 
IOException

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java 
Wed Jul 21 15:14:25 2010
@@ -18,6 +18,9 @@
 
 package org.apache.cassandra.db.migration;
 
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -26,6 +29,7 @@ import org.apache.cassandra.db.RowMutati
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.SerDeUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -43,7 +47,16 @@ public class AddKeyspace extends Migrati
     {
         super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
         rm = RowMutation.serializer().deserialize(din);
-        ksm = KSMetaData.deserialize(din);
+
+        // deserialize ks
+        try
+        {
+            ksm = 
KSMetaData.inflate(SerDeUtils.<org.apache.cassandra.avro.KsDef>deserializeWithSchema(FBUtilities.readShortByteArray(din)));
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
     }
     
     public AddKeyspace(KSMetaData ksm) throws ConfigurationException, 
IOException
@@ -96,7 +109,9 @@ public class AddKeyspace extends Migrati
             dos.write(UUIDGen.decompose(addKeyspace.newVersion));
             dos.write(UUIDGen.decompose(addKeyspace.lastVersion));
             RowMutation.serializer().serialize(addKeyspace.rm, dos);
-            dos.write(KSMetaData.serialize(addKeyspace.ksm));
+            // serialize the added ks
+            // TODO: sloppy, but migrations should be converted to Avro soon 
anyway
+            
FBUtilities.writeShortByteArray(SerDeUtils.serializeWithSchema(addKeyspace.ksm.deflate()),
 dos);
         }
 
         public AddKeyspace deserialize(DataInputStream dis) throws IOException

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java 
Wed Jul 21 15:14:25 2010
@@ -39,6 +39,8 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.UUIDGen;
@@ -111,12 +113,12 @@ public abstract class Migration
             migration.add(new QueryPath(MIGRATIONS_CF, null, 
UUIDGen.decompose(newVersion)), buf, new TimestampClock(now));
             migration.apply();
             
-            // note that we storing this in the system table, which is not 
replicated, instead of the definitions table, which is.
+            // note that we're storing this in the system table, which is not 
replicated
             logger.debug("Applying migration " + newVersion.toString());
             migration = new RowMutation(Table.SYSTEM_TABLE, 
LAST_MIGRATION_KEY);
             migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), 
UUIDGen.decompose(newVersion), new TimestampClock(now));
             migration.apply();
-            
+
             // if we fail here, there will be schema changes in the CL that 
will get replayed *AFTER* the schema is loaded.
             // CassandraDaemon checks for this condition (the stored version 
will be greater than the loaded version)
             // and calls MigrationManager.applyMigrations(loaded version, 
stored version).
@@ -131,6 +133,9 @@ public abstract class Migration
                 flushes.add(cfs.forceFlush());
             for (Future f : flushes)
             {
+                if (f == null)
+                    // applying the migration triggered a flush independently
+                    continue;
                 try
                 {
                     f.get();
@@ -194,23 +199,36 @@ public abstract class Migration
         return newVersion;
     }
 
+    /**
+     * Definitions are serialized as a row with a UUID key, with a magical 
column named DEFINITION_SCHEMA_COLUMN_NAME
+     * (containing the Avro Schema) and a column per keyspace. Each keyspace 
column contains a avro.KsDef object
+     * encoded with the Avro schema.
+     */
     static RowMutation makeDefinitionMutation(KSMetaData add, KSMetaData 
remove, UUID versionId) throws IOException
     {
-        final long now = System.currentTimeMillis();
-        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
toBytes(versionId));
-        if (remove != null)
-            rm.delete(new QueryPath(SCHEMA_CF, null, remove.name.getBytes()), 
new TimestampClock(System.currentTimeMillis()));
-        if (add != null)
-            rm.add(new QueryPath(SCHEMA_CF, null, add.name.getBytes()), 
KSMetaData.serialize(add), new TimestampClock(now));
-        
-        // include all other key spaces.
+        // collect all keyspaces, while removing 'remove' and adding 'add'
+        List<KSMetaData> ksms = new ArrayList<KSMetaData>();
         for (String tableName : DatabaseDescriptor.getNonSystemTables())
         {
-            if (add != null && add.name.equals(tableName) || remove != null && 
remove.name.equals(tableName))
+            if (remove != null && remove.name.equals(tableName) || add != null 
&& add.name.equals(tableName))
                 continue;
-            KSMetaData ksm = DatabaseDescriptor.getTableDefinition(tableName);
-            rm.add(new QueryPath(SCHEMA_CF, null, ksm.name.getBytes()), 
KSMetaData.serialize(ksm), new TimestampClock(now));
+            ksms.add(DatabaseDescriptor.getTableDefinition(tableName));
         }
+        if (add != null)
+            ksms.add(add);
+
+        // wrap in mutation
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
toUTF8Bytes(versionId));
+        TimestampClock now = new TimestampClock(System.currentTimeMillis());
+        // add a column for each keyspace
+        for (KSMetaData ksm : ksms)
+            rm.add(new QueryPath(SCHEMA_CF, null, ksm.name.getBytes(UTF_8)), 
SerDeUtils.serialize(ksm.deflate()), now);
+        // add the schema
+        rm.add(new QueryPath(SCHEMA_CF,
+                             null,
+                             DefsTable.DEFINITION_SCHEMA_COLUMN_NAME),
+                             
org.apache.cassandra.avro.KsDef.SCHEMA$.toString().getBytes(UTF_8),
+                             now);
         return rm;
     }
     
@@ -265,7 +283,7 @@ public abstract class Migration
         return cf.getSortedColumns();
     }
     
-    public static byte[] toBytes(UUID version)
+    public static byte[] toUTF8Bytes(UUID version)
     {
         return version.toString().getBytes(UTF_8);
     }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java 
Wed Jul 21 15:14:25 2010
@@ -25,7 +25,7 @@ public class ColumnDefinitionTest
 
     protected void testSerializeDeserialize(ColumnDefinition cd) throws 
Exception
     {
-        ColumnDefinition newCd = 
ColumnDefinition.deserialize(ColumnDefinition.serialize(cd));
+        ColumnDefinition newCd = ColumnDefinition.inflate(cd.deflate());
         assert cd != newCd;
         assert cd.hashCode() == newCd.hashCode();
         assert cd.equals(newCd);

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
 Wed Jul 21 15:14:25 2010
@@ -20,9 +20,13 @@ package org.apache.cassandra.config;
 
 import static org.junit.Assert.assertNotNull;
 
+import org.apache.avro.specific.SpecificRecord;
+
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.db.migration.AddKeyspace;
 import org.apache.cassandra.locator.RackUnawareStrategy;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.io.util.OutputBuffer;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -31,6 +35,13 @@ import java.util.UUID;
 
 public class DatabaseDescriptorTest
 {
+    protected <D extends SpecificRecord> D serDe(D record) throws IOException
+    {
+        D actual = SerDeUtils.<D>deserialize(record.getSchema(), 
SerDeUtils.serialize(record));
+        assert actual.equals(record) : actual + " != " + record;
+        return actual;
+    }
+
     @Test
     public void testShouldHaveConfigFileNameAvailable()
     {
@@ -45,22 +56,19 @@ public class DatabaseDescriptorTest
         {
             for (CFMetaData cfm : 
DatabaseDescriptor.getTableMetaData(table).values())
             {
-                byte[] ser = CFMetaData.serialize(cfm);
-                CFMetaData cfmDupe = CFMetaData.deserialize(new 
ByteArrayInputStream(ser));
+                CFMetaData cfmDupe = CFMetaData.inflate(serDe(cfm.deflate()));
                 assert cfmDupe != null;
                 assert cfmDupe.equals(cfm);
             }
         }
-
     }
 
     @Test
-    public void testKSMetaDataSerialization() throws IOException 
+    public void testKSMetaDataSerialization() throws IOException, 
ConfigurationException
     {
         for (KSMetaData ksm : DatabaseDescriptor.tables.values())
         {
-            byte[] ser = KSMetaData.serialize(ksm);
-            KSMetaData ksmDupe = KSMetaData.deserialize(new 
ByteArrayInputStream(ser));
+            KSMetaData ksmDupe = KSMetaData.inflate(serDe(ksm.deflate()));
             assert ksmDupe != null;
             assert ksmDupe.equals(ksm);
         }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Wed Jul 21 
15:14:25 2010
@@ -58,7 +58,7 @@ public class DefsTest extends CleanupHel
             assert defined.equals(loaded);
         }
     }
-     
+    
     @Test
     public void addNewCfToBogusTable() throws InterruptedException
     {


Reply via email to