TTL support for BulkOutputFormat. Patch by Samarth Gahire and brandonwilliams, reviewed by brandonwilliams for CASSANDRA-3754
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fdd3721f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fdd3721f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fdd3721f Branch: refs/heads/trunk Commit: fdd3721fbaee64e404feaa72806ff89d04878b4e Parents: 6c2ded6 Author: Brandon Williams <[email protected]> Authored: Mon Feb 6 12:20:52 2012 -0600 Committer: Brandon Williams <[email protected]> Committed: Mon Feb 6 12:20:52 2012 -0600 ---------------------------------------------------------------------- .../apache/cassandra/hadoop/BulkRecordWriter.java | 15 ++++++++++++--- 1 files changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdd3721f/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index 573ec8f..bd2bdbc 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -148,7 +148,12 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> if (colType == ColType.COUNTER) writer.addCounterColumn(column.name, column.value.getLong()); else - writer.addColumn(column.name, column.value, column.timestamp); + { + if(0 == column.ttl) + writer.addColumn(column.name, column.value, column.timestamp); + else + writer.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + (column.ttl * 1000)); + } } } else @@ -156,11 +161,15 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> if (colType == ColType.COUNTER) writer.addCounterColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value.getLong()); else - writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); + { + if(0 == mut.getColumn_or_supercolumn().column.ttl) + writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); + else + writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + (mut.getColumn_or_supercolumn().column.ttl * 1000)); + } } } } - @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
