Author: gdusbabek
Date: Wed Jul 21 15:14:25 2010
New Revision: 966277
URL: http://svn.apache.org/viewvc?rev=966277&view=rev
Log:
use avro serialization for KSM, CFS and parts of Migrations. patch by stuhood,
reviewed by gdusbabek. CASSANDRA-1186
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed
Jul 21 15:14:25 2010
@@ -22,20 +22,25 @@ import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.*;
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.ClockType;
import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.db.clock.TimestampReconciler;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.locator.DatacenterShardStrategy;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -48,8 +53,6 @@ public final class CFMetaData
public final static boolean DEFAULT_PRELOAD_ROW_CACHE = false;
private static final int MIN_CF_ID = 1000;
- private static final Logger logger =
LoggerFactory.getLogger(DatacenterShardStrategy.class);
-
private static final AtomicInteger idGen = new AtomicInteger(MIN_CF_ID);
private static final Map<Integer, String> currentCfNames = new
HashMap<Integer, String>();
@@ -114,7 +117,6 @@ public final class CFMetaData
public final Integer cfId;
public boolean preloadRowCache;
- // BytesToken because byte[].hashCode|equals is inherited from Object.
gggrrr...
public final Map<byte[], ColumnDefinition> column_metadata;
private CFMetaData(String tableName,
@@ -142,7 +144,7 @@ public final class CFMetaData
// cfType == Super, subcolumnComparator should default to BytesType if
not set.
this.subcolumnComparator = subcolumnComparator == null && cfType ==
ColumnFamilyType.Super ? BytesType.instance : subcolumnComparator;
this.reconciler = reconciler;
- this.comment = comment;
+ this.comment = comment == null ? "" : comment;
this.rowCacheSize = rowCacheSize;
this.preloadRowCache = preloadRowCache;
this.keyCacheSize = keyCacheSize;
@@ -198,74 +200,53 @@ public final class CFMetaData
+ "Columns Sorted By: " + comparator + "\n";
}
- public static byte[] serialize(CFMetaData cfm) throws IOException
+ public org.apache.cassandra.avro.CfDef deflate()
{
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- DataOutputStream dout = new DataOutputStream(bout);
- dout.writeUTF(cfm.tableName);
- dout.writeUTF(cfm.cfName);
- dout.writeUTF(cfm.cfType.name());
- dout.writeUTF(cfm.clockType.name());
- dout.writeUTF(cfm.comparator.getClass().getName());
- dout.writeBoolean(cfm.subcolumnComparator != null);
- if (cfm.subcolumnComparator != null)
- dout.writeUTF(cfm.subcolumnComparator.getClass().getName());
- dout.writeUTF(cfm.reconciler.getClass().getName());
- dout.writeBoolean(cfm.comment != null);
- if (cfm.comment != null)
- dout.writeUTF(cfm.comment);
- dout.writeDouble(cfm.rowCacheSize);
- dout.writeBoolean(cfm.preloadRowCache);
- dout.writeDouble(cfm.keyCacheSize);
- dout.writeDouble(cfm.readRepairChance);
- dout.writeInt(cfm.cfId);
- dout.writeInt(cfm.column_metadata.size());
- for (ColumnDefinition cd : cfm.column_metadata.values())
- {
- byte[] cdBytes = ColumnDefinition.serialize(cd);
- dout.writeInt(cdBytes.length);
- dout.write(cdBytes);
- }
- dout.close();
- return bout.toByteArray();
+ org.apache.cassandra.avro.CfDef cf = new
org.apache.cassandra.avro.CfDef();
+ cf.id = cfId;
+ cf.keyspace = new Utf8(tableName);
+ cf.name = new Utf8(cfName);
+ cf.column_type = new Utf8(cfType.name());
+ cf.clock_type = new Utf8(clockType.name());
+ cf.comparator_type = new Utf8(comparator.getClass().getName());
+ if (subcolumnComparator != null)
+ cf.subcomparator_type = new
Utf8(subcolumnComparator.getClass().getName());
+ cf.reconciler = new Utf8(reconciler.getClass().getName());
+ cf.comment = new Utf8(comment);
+ cf.row_cache_size = rowCacheSize;
+ cf.key_cache_size = keyCacheSize;
+ cf.preload_row_cache = preloadRowCache;
+ cf.read_repair_chance = readRepairChance;
+ cf.column_metadata = SerDeUtils.createArray(column_metadata.size(),
+
org.apache.cassandra.avro.ColumnDef.SCHEMA$);
+ for (ColumnDefinition cd : column_metadata.values())
+ cf.column_metadata.add(cd.deflate());
+ return cf;
}
- public static CFMetaData deserialize(InputStream in) throws IOException,
ConfigurationException
+ public static CFMetaData inflate(org.apache.cassandra.avro.CfDef cf)
throws ConfigurationException
{
- DataInputStream din = new DataInputStream(in);
- String tableName = din.readUTF();
- String cfName = din.readUTF();
- ColumnFamilyType cfType = ColumnFamilyType.create(din.readUTF());
- ClockType clockType = ClockType.create(din.readUTF());
- AbstractType comparator =
DatabaseDescriptor.getComparator(din.readUTF());
+ AbstractType comparator =
DatabaseDescriptor.getComparator(cf.comparator_type.toString());
AbstractType subcolumnComparator = null;
- subcolumnComparator = din.readBoolean() ?
DatabaseDescriptor.getComparator(din.readUTF()) : null;
+ if (cf.subcomparator_type != null)
+ subcolumnComparator =
DatabaseDescriptor.getComparator(cf.subcomparator_type.toString());
AbstractReconciler reconciler = null;
try
{
- reconciler =
(AbstractReconciler)Class.forName(din.readUTF()).newInstance();
+ reconciler =
(AbstractReconciler)Class.forName(cf.reconciler.toString()).newInstance();
}
catch (Exception ex)
{
- throw new IOException(ex);
+ throw new ConfigurationException("Could not create Reconciler of
type " + cf.reconciler, ex);
}
- String comment = din.readBoolean() ? din.readUTF() : null;
- double rowCacheSize = din.readDouble();
- boolean preloadRowCache = din.readBoolean();
- double keyCacheSize = din.readDouble();
- double readRepairChance = din.readDouble();
- int cfId = din.readInt();
- int columnMetadataEntries = din.readInt();
Map<byte[], ColumnDefinition> column_metadata = new TreeMap<byte[],
ColumnDefinition>(FBUtilities.byteArrayComparator);
- for (int i = 0; i < columnMetadataEntries; i++)
+ Iterator<org.apache.cassandra.avro.ColumnDef> cditer =
cf.column_metadata.iterator();
+ while (cditer.hasNext())
{
- int cdSize = din.readInt();
- byte[] cdBytes = new byte[cdSize];
- din.readFully(cdBytes);
- ColumnDefinition cd = ColumnDefinition.deserialize(cdBytes);
+ ColumnDefinition cd = ColumnDefinition.inflate(cditer.next());
column_metadata.put(cd.name, cd);
}
- return new CFMetaData(tableName, cfName, cfType, clockType,
comparator, subcolumnComparator, reconciler, comment, rowCacheSize,
preloadRowCache, keyCacheSize, readRepairChance, cfId, column_metadata);
+ return new CFMetaData(cf.keyspace.toString(), cf.name.toString(),
ColumnFamilyType.create(cf.column_type.toString()),
ClockType.create(cf.clock_type.toString()), comparator, subcolumnComparator,
reconciler, cf.comment.toString(), cf.row_cache_size, cf.preload_row_cache,
cf.key_cache_size, cf.read_repair_chance, cf.id, column_metadata);
}
public boolean equals(Object obj)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
Wed Jul 21 15:14:25 2010
@@ -1,8 +1,9 @@
package org.apache.cassandra.config;
-import java.io.*;
+import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.avro.util.Utf8;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
@@ -53,45 +54,27 @@ public class ColumnDefinition {
return result;
}
- public static byte[] serialize(ColumnDefinition cd) throws IOException
+ public org.apache.cassandra.avro.ColumnDef deflate()
{
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(bout);
- out.writeInt(cd.name.length);
- out.write(cd.name);
- out.writeUTF(cd.validator.getClass().getName());
-
- out.writeBoolean(cd.index_type != null);
- if (cd.index_type != null)
- out.writeInt(cd.index_type.ordinal());
-
- out.writeBoolean(cd.index_name != null);
- if (cd.index_name != null)
- out.writeUTF(cd.index_name);
-
- out.close();
- return bout.toByteArray();
- }
-
- public static ColumnDefinition deserialize(byte[] bytes) throws IOException
- {
- DataInputStream in = new DataInputStream(new
ByteArrayInputStream(bytes));
- int nameSize = in.readInt();
- byte[] name = new byte[nameSize];
- in.readFully(name);
- String validation_class = in.readUTF();
-
- IndexType index_type = null;
- if (in.readBoolean())
- index_type = IndexType.values()[in.readInt()];
-
- String index_name = null;
- if (in.readBoolean())
- index_name = in.readUTF();
-
+ org.apache.cassandra.avro.ColumnDef cd = new
org.apache.cassandra.avro.ColumnDef();
+ cd.name = ByteBuffer.wrap(name);
+ cd.validation_class = new Utf8(validator.getClass().getName());
+ cd.index_type = index_type == null ? null :
+ Enum.valueOf(org.apache.cassandra.avro.IndexType.class,
index_type.name());
+ cd.index_name = index_name == null ? null : new Utf8(index_name);
+ return cd;
+ }
+
+ public static ColumnDefinition inflate(org.apache.cassandra.avro.ColumnDef
cd) throws ConfigurationException
+ {
+ byte[] name = new byte[cd.name.remaining()];
+ cd.name.get(name, 0, name.length);
+ IndexType index_type = cd.index_type == null ? null :
+ Enum.valueOf(IndexType.class, cd.index_type.name());
+ String index_name = cd.index_name == null ? null :
cd.index_name.toString();
try
{
- return new ColumnDefinition(name, validation_class, index_type,
index_name);
+ return new ColumnDefinition(name, cd.validation_class.toString(),
index_type, index_name);
}
catch (ConfigurationException e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/ConfigurationException.java
Wed Jul 21 15:14:25 2010
@@ -18,8 +18,6 @@
package org.apache.cassandra.config;
-import java.io.IOException;
-
public class ConfigurationException extends Exception
{
public ConfigurationException(String message)
@@ -27,8 +25,8 @@ public class ConfigurationException exte
super(message);
}
- public ConfigurationException(String message, IOException ioe)
+ public ConfigurationException(String message, Exception e)
{
- super(message, ioe);
+ super(message, e);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Wed Jul 21 15:14:25 2010
@@ -62,8 +62,6 @@ public class DatabaseDescriptor
{
private static Logger logger =
LoggerFactory.getLogger(DatabaseDescriptor.class);
- public static final String random = "RANDOM";
- public static final String ophf = "OPHF";
private static IEndpointSnitch snitch;
private static InetAddress listenAddress; // leave null so we can fall
through to getLocalHost
private static InetAddress rpcAddress;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java Wed
Jul 21 15:14:25 2010
@@ -19,18 +19,17 @@
package org.apache.cassandra.config;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.RackUnawareStrategy;
+import org.apache.cassandra.io.SerDeUtils;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.avro.util.Utf8;
import org.apache.commons.lang.ObjectUtils;
public final class KSMetaData
@@ -43,7 +42,7 @@ public final class KSMetaData
public KSMetaData(String name, Class<? extends
AbstractReplicationStrategy> strategyClass, int replicationFactor,
CFMetaData... cfDefs)
{
this.name = name;
- this.strategyClass = strategyClass;
+ this.strategyClass = strategyClass == null ? RackUnawareStrategy.class
: strategyClass;
this.replicationFactor = replicationFactor;
Map<String, CFMetaData> cfmap = new HashMap<String, CFMetaData>();
for (CFMetaData cfm : cfDefs)
@@ -70,50 +69,35 @@ public final class KSMetaData
return cfMetaData;
}
- public static byte[] serialize(KSMetaData ksm) throws IOException
+ public org.apache.cassandra.avro.KsDef deflate()
{
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- DataOutputStream dout = new DataOutputStream(bout);
- dout.writeUTF(ksm.name);
- dout.writeBoolean(ksm.strategyClass != null);
- if (ksm.strategyClass != null)
- dout.writeUTF(ksm.strategyClass.getName());
- dout.writeInt(ksm.replicationFactor);
- dout.writeInt(ksm.cfMetaData.size());
- for (CFMetaData cfm : ksm.cfMetaData.values())
- dout.write(CFMetaData.serialize(cfm));
- dout.close();
- return bout.toByteArray();
+ org.apache.cassandra.avro.KsDef ks = new
org.apache.cassandra.avro.KsDef();
+ ks.name = new Utf8(name);
+ ks.strategy_class = new Utf8(strategyClass.getName());
+ ks.replication_factor = replicationFactor;
+ ks.cf_defs = SerDeUtils.createArray(cfMetaData.size(),
org.apache.cassandra.avro.CfDef.SCHEMA$);
+ for (CFMetaData cfm : cfMetaData.values())
+ ks.cf_defs.add(cfm.deflate());
+ return ks;
}
- public static KSMetaData deserialize(InputStream in) throws IOException
+ public static KSMetaData inflate(org.apache.cassandra.avro.KsDef ks)
throws ConfigurationException
{
- DataInputStream din = new DataInputStream(in);
- String name = din.readUTF();
Class<AbstractReplicationStrategy> repStratClass = null;
try
{
- repStratClass = din.readBoolean() ?
(Class<AbstractReplicationStrategy>)Class.forName(din.readUTF()) : null;
+ repStratClass =
(Class<AbstractReplicationStrategy>)Class.forName(ks.strategy_class.toString());
}
catch (Exception ex)
{
- throw new IOException(ex);
+ throw new ConfigurationException("Could not create
ReplicationStrategy of type " + ks.strategy_class, ex);
}
- int replicationFactor = din.readInt();
- int cfsz = din.readInt();
+ int cfsz = (int)ks.cf_defs.size();
CFMetaData[] cfMetaData = new CFMetaData[cfsz];
+ Iterator<org.apache.cassandra.avro.CfDef> cfiter =
ks.cf_defs.iterator();
for (int i = 0; i < cfsz; i++)
- {
- try
- {
- cfMetaData[i] = CFMetaData.deserialize(din);
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- }
+ cfMetaData[i] = CFMetaData.inflate(cfiter.next());
- return new KSMetaData(name, repStratClass, replicationFactor,
cfMetaData);
+ return new KSMetaData(ks.name.toString(), repStratClass,
ks.replication_factor, cfMetaData);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java Wed Jul 21
15:14:25 2010
@@ -18,6 +18,9 @@
package org.apache.cassandra.db;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
@@ -26,12 +29,15 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
@@ -41,44 +47,77 @@ import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import static com.google.common.base.Charsets.UTF_8;
+
public class DefsTable
{
+ // column name for the schema storing serialized keyspace definitions
+ // NB: must be an invalid keyspace name
+ public static final byte[] DEFINITION_SCHEMA_COLUMN_NAME =
"Avro/Schema".getBytes(UTF_8);
+
/** dumps current keyspace definitions to storage */
public static synchronized void dumpToStorage(UUID version) throws
IOException
{
- byte[] versionKey = Migration.toBytes(version);
- long now = System.currentTimeMillis();
+ final byte[] versionKey = Migration.toUTF8Bytes(version);
+
+ // build a list of keyspaces
+ Collection<String> ksnames = DatabaseDescriptor.getNonSystemTables();
+
+ // persist keyspaces under new version
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, versionKey);
- for (String tableName : DatabaseDescriptor.getNonSystemTables())
+ TimestampClock now = new TimestampClock(System.currentTimeMillis());
+ for (String ksname : ksnames)
{
- KSMetaData ks = DatabaseDescriptor.getTableDefinition(tableName);
- rm.add(new QueryPath(Migration.SCHEMA_CF, null,
ks.name.getBytes()), KSMetaData.serialize(ks), new TimestampClock(now));
+ KSMetaData ksm = DatabaseDescriptor.getTableDefinition(ksname);
+ rm.add(new QueryPath(Migration.SCHEMA_CF, null,
ksm.name.getBytes(UTF_8)), SerDeUtils.serialize(ksm.deflate()), now);
}
+ // add the schema
+ rm.add(new QueryPath(Migration.SCHEMA_CF,
+ null,
+ DEFINITION_SCHEMA_COLUMN_NAME),
+
org.apache.cassandra.avro.KsDef.SCHEMA$.toString().getBytes(UTF_8),
+ now);
rm.apply();
-
+
+ // apply new version
rm = new RowMutation(Table.SYSTEM_TABLE, Migration.LAST_MIGRATION_KEY);
- rm.add(new QueryPath(Migration.SCHEMA_CF, null,
Migration.LAST_MIGRATION_KEY), UUIDGen.decompose(version), new
TimestampClock(now));
+ rm.add(new QueryPath(Migration.SCHEMA_CF, null,
Migration.LAST_MIGRATION_KEY),
+ UUIDGen.decompose(version),
+ now);
rm.apply();
}
/** loads a version of keyspace definitions from storage */
public static synchronized Collection<KSMetaData> loadFromStorage(UUID
version) throws IOException
{
- DecoratedKey vkey =
StorageService.getPartitioner().decorateKey(Migration.toBytes(version));
+ DecoratedKey vkey =
StorageService.getPartitioner().decorateKey(Migration.toUTF8Bytes(version));
Table defs = Table.open(Table.SYSTEM_TABLE);
ColumnFamilyStore cfStore =
defs.getColumnFamilyStore(Migration.SCHEMA_CF);
- QueryFilter filter = QueryFilter.getSliceFilter(vkey, new
QueryPath(Migration.SCHEMA_CF), "".getBytes(), "".getBytes(), null, false,
1024);
+ QueryFilter filter = QueryFilter.getIdentityFilter(vkey, new
QueryPath(Migration.SCHEMA_CF));
ColumnFamily cf = cfStore.getColumnFamily(filter);
- Collection<KSMetaData> tables = new ArrayList<KSMetaData>();
- for (IColumn col : cf.getSortedColumns())
+ IColumn avroschema = cf.getColumn(DEFINITION_SCHEMA_COLUMN_NAME);
+ if (avroschema == null)
+ // TODO: more polite way to handle this?
+ throw new RuntimeException("Cannot read system table! Are you
upgrading a pre-release version?");
+ Schema schema = Schema.parse(new String(avroschema.value()));
+
+ // deserialize keyspaces using schema
+ Collection<KSMetaData> keyspaces = new ArrayList<KSMetaData>();
+ try
+ {
+ for (IColumn column : cf.getSortedColumns())
+ {
+ if (Arrays.equals(column.name(),
DEFINITION_SCHEMA_COLUMN_NAME))
+ continue;
+ org.apache.cassandra.avro.KsDef ks =
SerDeUtils.<org.apache.cassandra.avro.KsDef>deserialize(schema, column.value());
+ keyspaces.add(KSMetaData.inflate(ks));
+ }
+ }
+ catch (ConfigurationException e)
{
- // don't allow deleted columns.
- if (col instanceof DeletedColumn)
- continue;
- KSMetaData ks = KSMetaData.deserialize(new
ByteArrayInputStream(col.value()));
- tables.add(ks);
+ throw new IOException(e);
}
- return tables;
+ return keyspaces;
}
/** gets all the files that belong to a given column family. */
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
Wed Jul 21 15:14:25 2010
@@ -1,5 +1,8 @@
package org.apache.cassandra.db.migration;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -8,6 +11,7 @@ import org.apache.cassandra.db.RowMutati
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.SerDeUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -45,9 +49,11 @@ public class AddColumnFamily extends Mig
{
super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
rm = RowMutation.serializer().deserialize(din);
+
+ // deserialize cf
try
{
- cfm = CFMetaData.deserialize(din);
+ cfm =
CFMetaData.inflate(SerDeUtils.<org.apache.cassandra.avro.CfDef>deserializeWithSchema(FBUtilities.readShortByteArray(din)));
}
catch (ConfigurationException e)
{
@@ -115,7 +121,9 @@ public class AddColumnFamily extends Mig
dos.write(UUIDGen.decompose(addColumnFamily.newVersion));
dos.write(UUIDGen.decompose(addColumnFamily.lastVersion));
RowMutation.serializer().serialize(addColumnFamily.rm, dos);
- dos.write(CFMetaData.serialize(addColumnFamily.cfm));
+ // serialize the added cf
+ // TODO: sloppy, but migrations should be converted to Avro soon
anyway
+
FBUtilities.writeShortByteArray(SerDeUtils.serializeWithSchema(addColumnFamily.cfm.deflate()),
dos);
}
public AddColumnFamily deserialize(DataInputStream dis) throws
IOException
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
Wed Jul 21 15:14:25 2010
@@ -18,6 +18,9 @@
package org.apache.cassandra.db.migration;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -26,6 +29,7 @@ import org.apache.cassandra.db.RowMutati
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.SerDeUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -43,7 +47,16 @@ public class AddKeyspace extends Migrati
{
super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
rm = RowMutation.serializer().deserialize(din);
- ksm = KSMetaData.deserialize(din);
+
+ // deserialize ks
+ try
+ {
+ ksm =
KSMetaData.inflate(SerDeUtils.<org.apache.cassandra.avro.KsDef>deserializeWithSchema(FBUtilities.readShortByteArray(din)));
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
}
public AddKeyspace(KSMetaData ksm) throws ConfigurationException,
IOException
@@ -96,7 +109,9 @@ public class AddKeyspace extends Migrati
dos.write(UUIDGen.decompose(addKeyspace.newVersion));
dos.write(UUIDGen.decompose(addKeyspace.lastVersion));
RowMutation.serializer().serialize(addKeyspace.rm, dos);
- dos.write(KSMetaData.serialize(addKeyspace.ksm));
+ // serialize the added ks
+ // TODO: sloppy, but migrations should be converted to Avro soon
anyway
+
FBUtilities.writeShortByteArray(SerDeUtils.serializeWithSchema(addKeyspace.ksm.deflate()),
dos);
}
public AddKeyspace deserialize(DataInputStream dis) throws IOException
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
Wed Jul 21 15:14:25 2010
@@ -39,6 +39,8 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.UUIDGen;
@@ -111,12 +113,12 @@ public abstract class Migration
migration.add(new QueryPath(MIGRATIONS_CF, null,
UUIDGen.decompose(newVersion)), buf, new TimestampClock(now));
migration.apply();
- // note that we storing this in the system table, which is not
replicated, instead of the definitions table, which is.
+ // note that we're storing this in the system table, which is not
replicated
logger.debug("Applying migration " + newVersion.toString());
migration = new RowMutation(Table.SYSTEM_TABLE,
LAST_MIGRATION_KEY);
migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY),
UUIDGen.decompose(newVersion), new TimestampClock(now));
migration.apply();
-
+
// if we fail here, there will be schema changes in the CL that
will get replayed *AFTER* the schema is loaded.
// CassandraDaemon checks for this condition (the stored version
will be greater than the loaded version)
// and calls MigrationManager.applyMigrations(loaded version,
stored version).
@@ -131,6 +133,9 @@ public abstract class Migration
flushes.add(cfs.forceFlush());
for (Future f : flushes)
{
+ if (f == null)
+ // applying the migration triggered a flush independently
+ continue;
try
{
f.get();
@@ -194,23 +199,36 @@ public abstract class Migration
return newVersion;
}
+ /**
+ * Definitions are serialized as a row with a UUID key, with a magical
column named DEFINITION_SCHEMA_COLUMN_NAME
+ * (containing the Avro Schema) and a column per keyspace. Each keyspace
column contains a avro.KsDef object
+ * encoded with the Avro schema.
+ */
static RowMutation makeDefinitionMutation(KSMetaData add, KSMetaData
remove, UUID versionId) throws IOException
{
- final long now = System.currentTimeMillis();
- RowMutation rm = new RowMutation(Table.SYSTEM_TABLE,
toBytes(versionId));
- if (remove != null)
- rm.delete(new QueryPath(SCHEMA_CF, null, remove.name.getBytes()),
new TimestampClock(System.currentTimeMillis()));
- if (add != null)
- rm.add(new QueryPath(SCHEMA_CF, null, add.name.getBytes()),
KSMetaData.serialize(add), new TimestampClock(now));
-
- // include all other key spaces.
+ // collect all keyspaces, while removing 'remove' and adding 'add'
+ List<KSMetaData> ksms = new ArrayList<KSMetaData>();
for (String tableName : DatabaseDescriptor.getNonSystemTables())
{
- if (add != null && add.name.equals(tableName) || remove != null &&
remove.name.equals(tableName))
+ if (remove != null && remove.name.equals(tableName) || add != null
&& add.name.equals(tableName))
continue;
- KSMetaData ksm = DatabaseDescriptor.getTableDefinition(tableName);
- rm.add(new QueryPath(SCHEMA_CF, null, ksm.name.getBytes()),
KSMetaData.serialize(ksm), new TimestampClock(now));
+ ksms.add(DatabaseDescriptor.getTableDefinition(tableName));
}
+ if (add != null)
+ ksms.add(add);
+
+ // wrap in mutation
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE,
toUTF8Bytes(versionId));
+ TimestampClock now = new TimestampClock(System.currentTimeMillis());
+ // add a column for each keyspace
+ for (KSMetaData ksm : ksms)
+ rm.add(new QueryPath(SCHEMA_CF, null, ksm.name.getBytes(UTF_8)),
SerDeUtils.serialize(ksm.deflate()), now);
+ // add the schema
+ rm.add(new QueryPath(SCHEMA_CF,
+ null,
+ DefsTable.DEFINITION_SCHEMA_COLUMN_NAME),
+
org.apache.cassandra.avro.KsDef.SCHEMA$.toString().getBytes(UTF_8),
+ now);
return rm;
}
@@ -265,7 +283,7 @@ public abstract class Migration
return cf.getSortedColumns();
}
- public static byte[] toBytes(UUID version)
+ public static byte[] toUTF8Bytes(UUID version)
{
return version.toString().getBytes(UTF_8);
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
Wed Jul 21 15:14:25 2010
@@ -25,7 +25,7 @@ public class ColumnDefinitionTest
protected void testSerializeDeserialize(ColumnDefinition cd) throws
Exception
{
- ColumnDefinition newCd =
ColumnDefinition.deserialize(ColumnDefinition.serialize(cd));
+ ColumnDefinition newCd = ColumnDefinition.inflate(cd.deflate());
assert cd != newCd;
assert cd.hashCode() == newCd.hashCode();
assert cd.equals(newCd);
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
Wed Jul 21 15:14:25 2010
@@ -20,9 +20,13 @@ package org.apache.cassandra.config;
import static org.junit.Assert.assertNotNull;
+import org.apache.avro.specific.SpecificRecord;
+
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.db.migration.AddKeyspace;
import org.apache.cassandra.locator.RackUnawareStrategy;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.io.util.OutputBuffer;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -31,6 +35,13 @@ import java.util.UUID;
public class DatabaseDescriptorTest
{
+ protected <D extends SpecificRecord> D serDe(D record) throws IOException
+ {
+ D actual = SerDeUtils.<D>deserialize(record.getSchema(),
SerDeUtils.serialize(record));
+ assert actual.equals(record) : actual + " != " + record;
+ return actual;
+ }
+
@Test
public void testShouldHaveConfigFileNameAvailable()
{
@@ -45,22 +56,19 @@ public class DatabaseDescriptorTest
{
for (CFMetaData cfm :
DatabaseDescriptor.getTableMetaData(table).values())
{
- byte[] ser = CFMetaData.serialize(cfm);
- CFMetaData cfmDupe = CFMetaData.deserialize(new
ByteArrayInputStream(ser));
+ CFMetaData cfmDupe = CFMetaData.inflate(serDe(cfm.deflate()));
assert cfmDupe != null;
assert cfmDupe.equals(cfm);
}
}
-
}
@Test
- public void testKSMetaDataSerialization() throws IOException
+ public void testKSMetaDataSerialization() throws IOException,
ConfigurationException
{
for (KSMetaData ksm : DatabaseDescriptor.tables.values())
{
- byte[] ser = KSMetaData.serialize(ksm);
- KSMetaData ksmDupe = KSMetaData.deserialize(new
ByteArrayInputStream(ser));
+ KSMetaData ksmDupe = KSMetaData.inflate(serDe(ksm.deflate()));
assert ksmDupe != null;
assert ksmDupe.equals(ksm);
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=966277&r1=966276&r2=966277&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Wed Jul 21
15:14:25 2010
@@ -58,7 +58,7 @@ public class DefsTest extends CleanupHel
assert defined.equals(loaded);
}
}
-
+
@Test
public void addNewCfToBogusTable() throws InterruptedException
{