propagate range filter to ColumnFamilyRecordReader patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2878
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d51d6f0b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d51d6f0b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d51d6f0b Branch: refs/heads/trunk Commit: d51d6f0b43b006b40ec3c1d6d13e2155b58136c0 Parents: b08c675 Author: Jonathan Ellis <[email protected]> Authored: Fri Jan 20 14:05:07 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Mon Jan 23 16:00:44 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 2 + .../cassandra/hadoop/ColumnFamilyInputFormat.java | 2 +- .../cassandra/hadoop/ColumnFamilyRecordReader.java | 6 +++- .../org/apache/cassandra/hadoop/ConfigHelper.java | 25 ++++++++++++-- .../apache/cassandra/thrift/ThriftValidation.java | 7 ---- 7 files changed, 31 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4a8ee42..1e51f54 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,7 @@ * Use faster bytes comparison (CASSANDRA-3434) * Bulk loader is no longer a fat client, (HADOOP) bulk load output format (CASSANDRA-3045) + * (Hadoop) add support for KeyRange.filter * remove assumption that keys and token are in bijection (CASSANDRA-1034, 3574, 3604) * always remove endpoints from delevery queue in HH (CASSANDRA-3546) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index e685b12..0b227f3 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -55,6 +55,7 @@ Features be pinned to specfic media. - Hadoop: a new BulkOutputFormat is included which will directly write SSTables locally and then stream them into the cluster. + - Hadoop: KeyRange.filter is now supported with ColumnFamilyInputFormat - The bulk loader is not longer a fat client; it can be run from an existing machine in a cluster. - A new write survey mode has been added, similar to bootstrap (enabled via http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 98223f7..7ff46c9 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1304,6 +1304,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter) { + if (logger.isDebugEnabled()) + logger.debug("Filtering {} for rows matching {}", rowIterator, filter); List<Row> rows = new ArrayList<Row>(); int columnsCount = 0; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index c13e881..8abc460 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -124,7 +124,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); IPartitioner partitioner = null; Range<Token> jobRange = null; - if (jobKeyRange != null) + if (jobKeyRange != null && jobKeyRange.start_token != null) { partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index cbe2b3c..46b767a 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -68,6 +68,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private Cassandra.Client client; private ConsistencyLevel consistencyLevel; private int keyBufferSize = 8192; + private List<IndexExpression> filter; public ColumnFamilyRecordReader() { @@ -131,6 +132,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap { this.split = (ColumnFamilySplit) split; Configuration conf = context.getConfiguration(); + KeyRange jobRange = ConfigHelper.getInputKeyRange(conf); + filter = jobRange == null ? null : jobRange.row_filter; predicate = ConfigHelper.getInputSlicePredicate(conf); isEmptyPredicate = isEmptyPredicate(predicate); totalRowCount = ConfigHelper.getInputSplitSize(conf); @@ -283,7 +286,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap KeyRange keyRange = new KeyRange(batchRowCount) .setStart_token(startToken) - .setEnd_token(split.getEndToken()); + .setEnd_token(split.getEndToken()) + .setRow_filter(filter); try { rows = client.get_range_slices(new ColumnParent(cfName), http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 63eec8c..9d89a79 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -25,10 +25,7 @@ import java.util.List; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.KeyRange; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.TBinaryProtocol; +import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; import org.apache.hadoop.conf.Configuration; @@ -225,6 +222,26 @@ public class ConfigHelper conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range)); } + /** + * Set the KeyRange to limit the rows. + * @param conf Job configuration you are about to run + */ + public static void setInputRange(Configuration conf, String startToken, String endToken, List<IndexExpression> filter) + { + KeyRange range = new KeyRange().setStart_token(startToken).setEnd_token(endToken).setRow_filter(filter); + conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range)); + } + + /** + * Set the KeyRange to limit the rows. + * @param conf Job configuration you are about to run + */ + public static void setInputRange(Configuration conf, List<IndexExpression> filter) + { + KeyRange range = new KeyRange().setRow_filter(filter); + conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range)); + } + /** may be null if unset */ public static KeyRange getInputKeyRange(Configuration conf) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d51d6f0b/src/java/org/apache/cassandra/thrift/ThriftValidation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java index 31035e3..b1ab665 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java +++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java @@ -514,13 +514,6 @@ public class ThriftValidation { throw new InvalidRequestException("super columns are not yet supported for indexing"); } - if (!isEmpty(range.row_filter) && range.start_key == null) - { - // TODO: our current KEYS indexes can't do that efficiently - // (without scanning *all* the keys in the range and simply applying the filter to discard them when they don't match) - // See KeySearcher.search() - throw new InvalidRequestException("filtered queries must use concrete keys rather than tokens"); - } if (range.count <= 0) {
