Hi,

> Should I just run command (in Cassandra 0.5 source folder?) like:
> patch –p1 –i  0001-Add-new-ExpiringColumn-class.patch
> for all of the five patches in your ticket?

Well, actually I lied. The patches were made for a version a little after 0.5.
If you really want to try, I attach a version of those patches that (should)
work with 0.5 (There is only the 3 first patch, but the fourth one is for tests
so not necessary per se). Apply them with your patch command.
Still, to compile that you will have to regenerate the thrift java
interface (with
ant gen-thrift-java), but for that you will have to install the right
svn revision of
thrift (which is libthrift-r820831 for 0.5). And if you manage to make it work,
you will have to digg in cassandra.thrift as it make change to it.

In the end, remember that this is not an official patch yet and it *will not*
make it in Cassandra in its current form. All I can tell you is that I
need those
expiring columns for quite some of my usage and I will do what I can to make
this feature included if and when possible.

> Also what’s your opinion on extending ExpiringColumn to expire a key
> completely? Otherwise it will be difficult to track what are expired or old
> rows in Cassandra.

I'm not sure how to make full rows (or even full superColumns for that matter)
expire. What if you set a row to expire after some time and add new columns
before this expiration ? Should you update the expiration of the row ? Which is
to say that a row will expires when it's last column expire, which is
almost what
you get with expiring column.
The one thing you may want though is that when all the columns of a row expire
(or, to be precise, get physically deleted), the row itself is
deleted. Looking at the
code, I'm not convince this happen and I'm not sure why.

--
Sylvain

>
>
>
> From: Weijun Li [mailto:weiju...@gmail.com]
> Sent: Tuesday, February 23, 2010 6:18 PM
> To: cassandra-user@incubator.apache.org
> Subject: Re: Strategy to delete/expire keys in cassandra
>
>
>
> Thanks for the answer.  A dumb question: how did you apply the patch file to
> 0.5 source? The link you gave doesn't mention that the patch is for 0.5??
>
> Also, this ExpiringColumn feature doesn't seem to expire key/row, meaning
> the number of keys will keep grow (even if you drop columns for them) unless
> you delete them. In your case, how do you manage deleting/expiring keys from
> Cassandra? Do you keep a list of keys somewhere and go through them once a
> while?
>
> Thanks,
>
> -Weijun
>
> On Tue, Feb 23, 2010 at 2:26 AM, Sylvain Lebresne <sylv...@yakaz.com> wrote:
>
> Hi,
>
> Maybe the following ticket/patch may be what you are looking for:
> https://issues.apache.org/jira/browse/CASSANDRA-699
>
> It's flagged for 0.7 but as it breaks the API (and if I understand correctly
> the release plan) it may not make it in cassandra before 0.8 (and the
> patch will have to change to accommodate the change that will be
> made to the internals in 0.7).
>
> Anyway, what I can at least tell you is that I'm using the patch against
> 0.5 in a test cluster without problem so far.
>
>> 3)      Once keys are deleted, do you have to wait till next GC to clean
>> them from disk or memory (suppose you don’t run cleanup manually)? What’s
>> the strategy for Cassandra to handle deleted items (notify other replica
>> nodes, cleanup memory/disk, defrag/rebuild disk files, rebuild bloom
>> filter
>> etc). I’m asking this because if the keys refresh very fast (i.e., high
>> volume write/read and expiration is kind of short) how will the data file
>> grow and how does this impact the system performance.
>
> Items are deleted only during compaction, and you may actually have to
> wait for the GCGraceSeconds before deletion. This value is configurable in
> storage-conf.xml, but is 10 days by default. You can decrease this value
> but because of consistency (and the fact that you have to at least wait for
> compaction to occurs) you will always have a delay before the actual delete
> (all this is also true for the patch I mention above by the way). But when
> it's
> deleted, it's just skipping the items during compaction, so it's really
> cheap.
>
> --
> Sylvain
>
>
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index d9401b5..8c42884 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -36,7 +36,7 @@
  *  with something like PCollections -- http://code.google.com
  */
 
-public final class Column implements IColumn
+public class Column implements IColumn
 {
     private static Logger logger_ = Logger.getLogger(Column.class);
 
@@ -176,7 +176,7 @@
         try
         {
             buffer.writeLong(timestamp);
-            buffer.writeBoolean(isMarkedForDelete);
+            buffer.writeByte((isMarkedForDelete) ? ColumnSerializer.DELETION_MASK : 0);
         }
         catch (IOException e)
         {
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 814513d..dc8d025 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -28,6 +28,9 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class ColumnSerializer implements ICompactSerializer2<IColumn>
 {
+    public final static int DELETION_MASK = 0x01;
+    public final static int EXPIRATION_MASK = 0x02;
+
     public static void writeName(byte[] name, DataOutput out)
     {
         int length = name.length;
@@ -61,7 +64,13 @@ public class ColumnSerializer implements ICompactSerializer2<IColumn>
         ColumnSerializer.writeName(column.name(), dos);
         try
         {
-            dos.writeBoolean(column.isMarkedForDelete());
+            if (column instanceof ExpiringColumn) {
+              dos.writeByte(EXPIRATION_MASK);
+              dos.writeInt(((ExpiringColumn) column).getTimeToLive());
+              dos.writeInt(column.getLocalDeletionTime());
+            } else {
+              dos.writeByte((column.isMarkedForDelete()) ? DELETION_MASK : 0);
+            }
             dos.writeLong(column.timestamp());
             FBUtilities.writeByteArray(column.value(), dos);
         }
@@ -74,18 +83,39 @@ public class ColumnSerializer implements ICompactSerializer2<IColumn>
     public Column deserialize(DataInput dis) throws IOException
     {
         byte[] name = ColumnSerializer.readName(dis);
-        boolean delete = dis.readBoolean();
-        long ts = dis.readLong();
-        int length = dis.readInt();
-        if (length < 0)
+        int b = dis.readUnsignedByte();
+        if (FBUtilities.testBitUsingBitMask(b, EXPIRATION_MASK))
         {
-            throw new IOException("Corrupt (negative) value length encountered");
+            int ttl = dis.readInt();
+            int expiration = dis.readInt();
+            long ts = dis.readLong();
+            int length = dis.readInt();
+            if (length < 0)
+            {
+                throw new IOException("Corrupt (negative) value length encountered");
+            }
+            byte[] value = new byte[length];
+            if (length > 0)
+            {
+                dis.readFully(value);
+            }
+            return new ExpiringColumn(name, value, ts, ttl, expiration);
         }
-        byte[] value = new byte[length];
-        if (length > 0)
+        else
         {
-            dis.readFully(value);
+            boolean delete = FBUtilities.testBitUsingBitMask(b, DELETION_MASK);
+            long ts = dis.readLong();
+            int length = dis.readInt();
+            if (length < 0)
+            {
+                throw new IOException("Corrupt (negative) value length encountered");
+            }
+            byte[] value = new byte[length];
+            if (length > 0)
+            {
+                dis.readFully(value);
+            }
+            return new Column(name, value, ts, delete);
         }
-        return new Column(name, value, ts, delete);
     }
 }
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
new file mode 100644
index 0000000..8433189
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.security.MessageDigest;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.io.DataOutputBuffer;
+
+
+/**
+ * Alternative to Column that have an expiring time.
+ * ExpiringColumn is immutable (as Column is).
+ */
+public class ExpiringColumn extends Column
+{
+    private static Logger logger_ = Logger.getLogger(ExpiringColumn.class);
+
+    private final int localExpirationTime;
+    private final int timeToLive;
+
+    public ExpiringColumn(byte[] name, byte[] value, long timestamp, int timeToLive)
+    {
+      this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive);
+    }
+
+    public ExpiringColumn(byte[] name, byte[] value, long timestamp, int timeToLive, int localExpirationTime)
+    {
+        super(name, value, timestamp);
+        assert timeToLive != 0;
+        assert localExpirationTime != 0;
+        this.timeToLive = timeToLive;
+        this.localExpirationTime = localExpirationTime;
+    }
+
+    public int getTimeToLive()
+    {
+        return timeToLive;
+    }
+
+    @Override
+    public boolean isMarkedForDelete()
+    {
+        return ((int) (System.currentTimeMillis() / 1000 ) > localExpirationTime);
+    }
+
+    @Override
+    public long getMarkedForDeleteAt()
+    {
+        if (!isMarkedForDelete())
+        {
+            throw new IllegalStateException("column is not marked for delete");
+        }
+        return timestamp() + (long) (timeToLive * 1000);
+    }
+
+    @Override
+    public int size()
+    {
+        /*
+         * Size of a column is =
+         *   size of a name (UtfPrefix + length of the string)
+         * + 1 byte to indicate if the column has been deleted & that it is an expiring one
+         * + 8 bytes for timestamp
+         * + 4 bytes for the localExpirationTime
+         * + 4 bytes for the timeToLive
+         * + 4 bytes which basically indicates the size of the byte array
+         * + entire byte array.
+        */
+
+        /*
+           * We store the string as UTF-8 encoded, so when we calculate the length, it
+           * should be converted to UTF-8.
+           */
+        return IColumn.UtfPrefix_ + name().length + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + DBConstants.intSize_ + DBConstants.intSize_ + value().length;
+    }
+
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        digest.update(name());
+        digest.update(value());
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        try
+        {
+            buffer.writeLong(timestamp());
+            buffer.writeByte(ColumnSerializer.EXPIRATION_MASK);
+            buffer.writeInt(timeToLive);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        digest.update(buffer.getData(), 0, buffer.getLength());
+    }
+
+    @Override
+    public int getLocalDeletionTime()
+    {
+        return localExpirationTime;
+    }
+}
+
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 9e48e1f..6144bf3 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -342,4 +342,17 @@ public class FBUtilities
             i = j;
         }
     }
+
+    /**
+     * Test if a particular bit is set using a bit mask.
+     *
+     * @param v the value in which a bit must be tested
+     * @param mask the bit mask use to select a bit of <code>v</code>
+     * @return true if the bit of <code>v</code> selected by <code>mask<code>
+     * is set, false otherwise.
+     */
+    public static boolean testBitUsingBitMask(int v, int mask)
+    {
+        return (v & mask) != 0;
+    }
 }
-- 
1.6.3.3

diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 0414b4a..018994e 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -57,11 +57,15 @@ const string VERSION = "1.0.0"
  *        is used as a key to its value.
  * @param value. Some data
  * @param timestamp. Used to record when data was sent to be written.
+ * @param ttl. A delay (in seconds) after which the column will be automatically deleted. If this parameter is not
+ *             provided or is <= 0, the column will never be deleted automatically (and will have no ttl when queried). 
+ *             Note that, if set, the column will be deleted from a node ttl seconds after the column reach the node. 
  */
 struct Column {
    1: required binary name,
    2: required binary value,
    3: required i64 timestamp,
+   4: optional i32 ttl,
 }
 
 /** A named list of columns.
@@ -323,16 +327,13 @@ service Cassandra {
   # modification methods
 
   /**
-    Insert a Column consisting of (column_path.column, value, timestamp) at the given column_path.column_family and optional
-    column_path.super_column. Note that column_path.column is here required, since a SuperColumn cannot directly contain binary
-    values -- it can only contain sub-Columns. 
+    Insert a Column at the given column_parent.column_family and optional column_parent.super_column.
    */
   void insert(1:required string keyspace, 
               2:required string key, 
-              3:required ColumnPath column_path, 
-              4:required binary value, 
-              5:required i64 timestamp, 
-              6:required ConsistencyLevel consistency_level=0)
+              3:required ColumnParent column_parent,
+              4:required Column column,
+              5:required ConsistencyLevel consistency_level=0)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java
index 541d81e..3fa7841 100644
--- a/src/java/org/apache/cassandra/cli/CliClient.java
+++ b/src/java/org/apache/cassandra/cli/CliClient.java
@@ -372,8 +372,8 @@ public class CliClient
         }
         
         // do the insert
-        thriftClient_.insert(tableName, key, new ColumnPath(columnFamily, superColumnName, columnName),
-                             value.getBytes(), System.currentTimeMillis(), ConsistencyLevel.ONE);
+        thriftClient_.insert(tableName, key, new ColumnParent(columnFamily, superColumnName),
+                             new Column(columnName, value.getBytes(), System.currentTimeMillis()), ConsistencyLevel.ONE);
         
         css_.out.println("Value inserted.");
     }
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 737980f..3671b77 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -159,6 +159,30 @@ public class ColumnFamily implements IColumnContainer
         addColumn(path, value, timestamp, false);
     }
 
+    public void addColumn(QueryPath path, byte[] value, long timestamp, int timeToLive)
+    {
+        if (timeToLive > 0)
+        {
+            assert path.columnName != null : path;
+            IColumn column;
+            if (path.superColumnName == null)
+            {
+              column = new ExpiringColumn(path.columnName, value, timestamp, timeToLive);
+            }
+            else
+            {
+              assert isSuper();
+              column = new SuperColumn(path.superColumnName, getSubComparator());
+              column.addColumn(new ExpiringColumn(path.columnName, value, timestamp, timeToLive)); // checks subcolumn name
+            }
+            addColumn(column);
+        }
+        else
+        {
+            addColumn(path, value, timestamp, false);
+        }
+    }
+
     /** In most places the CF must be part of a QueryPath but here it is ignored. */
     public void addColumn(QueryPath path, byte[] value, long timestamp, boolean deleted)
 	{
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 5ddb8b1..fb7e3bb 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -145,18 +145,24 @@ public class RowMutation
      * param @ cf - column name as <column family>:<column>
      * param @ value - value associated with the column
      * param @ timestamp - timestamp associated with this data.
+     * param @ timeToLive - ttl for the column, 0 for standard (non expiring) columns
     */
-    public void add(QueryPath path, byte[] value, long timestamp)
+    public void add(QueryPath path, byte[] value, long timestamp, int timeToLive)
     {
         ColumnFamily columnFamily = modifications_.get(path.columnFamilyName);
         if (columnFamily == null)
         {
             columnFamily = ColumnFamily.create(table_, path.columnFamilyName);
         }
-        columnFamily.addColumn(path, value, timestamp);
+        columnFamily.addColumn(path, value, timestamp, timeToLive);
         modifications_.put(path.columnFamilyName, columnFamily);
     }
 
+    public void add(QueryPath path, byte[] value, long timestamp)
+    {
+        add(path, value, timestamp, 0);
+    }
+
     public void delete(QueryPath path, long timestamp)
     {
         assert path.columnFamilyName != null;
@@ -253,13 +259,13 @@ public class RowMutation
                     assert cosc.super_column != null;
                     for (org.apache.cassandra.service.Column column : cosc.super_column.columns)
                     {
-                        rm.add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, column.timestamp);
+                        rm.add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, column.timestamp, column.getTtl());
                     }
                 }
                 else
                 {
                     assert cosc.super_column == null;
-                    rm.add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp);
+                    rm.add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.getTtl());
                 }
             }
         }
diff --git a/src/java/org/apache/cassandra/service/CassandraServer.java b/src/java/org/apache/cassandra/service/CassandraServer.java
index c03727c..576afb7 100644
--- a/src/java/org/apache/cassandra/service/CassandraServer.java
+++ b/src/java/org/apache/cassandra/service/CassandraServer.java
@@ -111,6 +111,10 @@ public class CassandraServer implements Cassandra.Iface
                 continue;
             }
             Column thrift_column = new Column(column.name(), column.value(), column.timestamp());
+            if (column instanceof ExpiringColumn)
+            {
+                thrift_column.setTtl(((ExpiringColumn) column).getTimeToLive());
+            }
             thriftColumns.add(thrift_column);
         }
 
@@ -127,6 +131,10 @@ public class CassandraServer implements Cassandra.Iface
                 continue;
             }
             Column thrift_column = new Column(column.name(), column.value(), column.timestamp());
+            if (column instanceof ExpiringColumn)
+            {
+                thrift_column.setTtl(((ExpiringColumn) column).getTimeToLive());
+            }
             thriftColumns.add(new ColumnOrSuperColumn(thrift_column, null));
         }
 
@@ -332,9 +340,19 @@ public class CassandraServer implements Cassandra.Iface
                 }
                 else
                 {
-                    columnorsupercolumn = column instanceof org.apache.cassandra.db.Column
-                                          ? new ColumnOrSuperColumn(new Column(column.name(), column.value(), column.timestamp()), null)
-                                          : new ColumnOrSuperColumn(null, new SuperColumn(column.name(), thriftifySubColumns(column.getSubColumns())));
+                    if (column instanceof org.apache.cassandra.db.Column)
+                    {
+                      Column thrift_column = new Column(column.name(), column.value(), column.timestamp());
+                      if (column instanceof ExpiringColumn)
+                      {
+                          thrift_column.setTtl(((ExpiringColumn) column).getTimeToLive());
+                      }
+                      columnorsupercolumn = new ColumnOrSuperColumn(thrift_column, null);
+                    }
+                    else
+                    {
+                      columnorsupercolumn = new ColumnOrSuperColumn(null, new SuperColumn(column.name(), thriftifySubColumns(column.getSubColumns())));
+                    }
                 }
 
             }
@@ -389,18 +407,19 @@ public class CassandraServer implements Cassandra.Iface
         return columnFamiliesMap;
     }
 
-    public void insert(String table, String key, ColumnPath column_path, byte[] value, long timestamp, int consistency_level)
+    public void insert(String table, String key, ColumnParent column_parent, Column column, int consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
         if (logger.isDebugEnabled())
             logger.debug("insert");
         ThriftValidation.validateKey(key);
-        ThriftValidation.validateColumnPath(table, column_path);
+        ThriftValidation.validateColumnParent(table, column_parent);
+        ThriftValidation.validateColumn(table, column_parent, column.name);
 
         RowMutation rm = new RowMutation(table, key);
         try
         {
-            rm.add(new QueryPath(column_path), value, timestamp);
+            rm.add(new QueryPath(column_parent.column_family, column_parent.super_column, column.name), column.value, column.timestamp, Math.max(column.ttl, 0));
         }
         catch (MarshalException e)
         {
diff --git a/src/java/org/apache/cassandra/service/ThriftValidation.java b/src/java/org/apache/cassandra/service/ThriftValidation.java
index ddda241..e4da017 100644
--- a/src/java/org/apache/cassandra/service/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/service/ThriftValidation.java
@@ -206,6 +206,11 @@ public class ThriftValidation
         }
     }
 
+    public static void validateColumn(String keyspace, ColumnParent column_parent, byte[] column_name) throws InvalidRequestException
+    {
+        validateColumns(keyspace, column_parent, Arrays.asList(column_name));
+    }
+
     public static void validatePredicate(String keyspace, ColumnParent column_parent, SlicePredicate predicate)
             throws InvalidRequestException
     {
diff --git a/test/unit/org/apache/cassandra/client/TestRingCache.java b/test/unit/org/apache/cassandra/client/TestRingCache.java
index 7c9eb2a..09bb191 100644
--- a/test/unit/org/apache/cassandra/client/TestRingCache.java
+++ b/test/unit/org/apache/cassandra/client/TestRingCache.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.cassandra.service.Cassandra;
 import org.apache.cassandra.service.Column;
+import org.apache.cassandra.service.ColumnParent;
 import org.apache.cassandra.service.ColumnPath;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -65,7 +66,9 @@ public class TestRingCache
         for (int nRows=1; nRows<10; nRows++)
         {
             String row = "row" + nRows;
-            ColumnPath col = new ColumnPath("Standard1", null, "col1".getBytes());
+            ColumnParent parent = new ColumnParent("Standard1", null);
+            byte[] columnName = "col1".getBytes();
+            ColumnPath path = new ColumnPath("Standard1", null, columnName);
 
             List<InetAddress> endPoints = ringCache.getEndPoint(row);
             String hosts="";
@@ -75,8 +78,8 @@ public class TestRingCache
         
             // now, read the row back directly from the host owning the row locally
             setup(endPoints.get(0).getHostAddress(), DatabaseDescriptor.getThriftPort());
-            thriftClient.insert(table, row, col, "val1".getBytes(), 1, 1);
-            Column column=thriftClient.get(table, row, col, 1).column;
+            thriftClient.insert(table, row, parent, new Column(columnName, "val1".getBytes(), 1), 1);
+            Column column=thriftClient.get(table, row, path, 1).column;
             System.out.println("read row " + row + " " + new String(column.name) + ":" + new String(column.value) + ":" + column.timestamp);
         }
         System.exit(1);

Reply via email to