Updated Branches: refs/heads/cassandra-1.2 0bff5f57d -> f1004e9b1 refs/heads/trunk 8bf6e1559 -> 670954cb3
update CqlRecordWriter interface patch by Alex Liu and jbellis for CASSANDRA-5622 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1004e9b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1004e9b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1004e9b Branch: refs/heads/cassandra-1.2 Commit: f1004e9b175d6593b816e72058b2dbea14257a43 Parents: 0bff5f5 Author: Jonathan Ellis <[email protected]> Authored: Mon Jun 17 17:41:58 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Mon Jun 17 17:41:58 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../hadoop_cql3_word_count/src/WordCount.java | 9 +- .../AbstractColumnFamilyRecordWriter.java | 4 +- .../hadoop/ColumnFamilyRecordWriter.java | 2 +- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 119 ++++++++++--------- 5 files changed, 67 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c28d4d7..2bba0ee 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,7 @@ 1.2.6 * Reduce SSTableLoader memory usage (CASSANDRA-5555) * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272) - * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421) + * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622) * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536) * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589) * Ignore pre-truncate hints (CASSANDRA-4655) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/examples/hadoop_cql3_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java index 611f9c2..c92f047 100644 --- a/examples/hadoop_cql3_word_count/src/WordCount.java +++ b/examples/hadoop_cql3_word_count/src/WordCount.java @@ -166,9 +166,7 @@ public class WordCount extends Configured implements Tool private List<ByteBuffer> getBindVariables(Text word, int sum) { List<ByteBuffer> variables = new ArrayList<ByteBuffer>(); - variables.add(keys.get("row_id1")); - variables.add(keys.get("row_id2")); - variables.add(ByteBufferUtil.bytes(word.toString())); + keys.put("word", ByteBufferUtil.bytes(word.toString())); variables.add(ByteBufferUtil.bytes(String.valueOf(sum))); return variables; } @@ -210,9 +208,8 @@ public class WordCount extends Configured implements Tool ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); job.getConfiguration().set(PRIMARY_KEY, "word,sum"); - String query = "INSERT INTO " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + - " (row_id1, row_id2, word, count_num) " + - " values (?, ?, ?, ?)"; + String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + + " SET count_num = ? "; CqlConfigHelper.setOutputCql(job.getConfiguration(), query); ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost"); ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java index 6428db3..456130d 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java @@ -109,7 +109,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite // The list of endpoints for this range protected final List<InetAddress> endpoints; // A bounded queue of incoming mutations for this range - protected final BlockingQueue<Pair<ByteBuffer, K>> queue = new ArrayBlockingQueue<Pair<ByteBuffer, K>>(queueSize); + protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize); protected volatile boolean run = true; // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing @@ -132,7 +132,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite /** * enqueues the given value to Cassandra */ - public void put(Pair<ByteBuffer, K> value) throws IOException + public void put(K value) throws IOException { while (true) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java index 50ec059..6823342 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java @@ -134,7 +134,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By * A client that runs in a threadpool and connects to the list of endpoints for a particular * range. Mutations for keys in that range are sent to this client via a queue. */ - public class RangeClient extends AbstractRangeClient<Mutation> + public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>> { public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1004e9b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index dde6b1f..642d8c4 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -20,13 +20,12 @@ package org.apache.cassandra.hadoop.cql3; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import org.apache.cassandra.thrift.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.LongType; @@ -38,15 +37,13 @@ import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.Progressable; +import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value> @@ -75,7 +72,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, private final String cql; private AbstractType<?> keyValidator; - private String [] partitionkeys; + private String [] partitionKeyColumns; + private List<String> clusterColumns; /** * Upon construction, obtain the map that this writer will use to collect @@ -96,30 +94,30 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, this.progressable = progressable; } - CqlRecordWriter(Configuration conf) throws IOException + CqlRecordWriter(Configuration conf) { super(conf); this.clients = new HashMap<Range, RangeClient>(); - cql = CqlConfigHelper.getOutputCql(conf); try { - String host = getAnyHost(); - int port = ConfigHelper.getOutputRpcPort(conf); - Cassandra.Client client = CqlOutputFormat.createAuthenticatedClient(host, port, conf); + Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf); retrievePartitionKeyValidator(client); - + String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim(); + if (cqlQuery.toLowerCase().startsWith("insert")) + throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement"); + cql = appendKeyWhereClauses(cqlQuery); + if (client != null) { TTransport transport = client.getOutputProtocol().getTransport(); if (transport.isOpen()) transport.close(); - client = null; } } catch (Exception e) { - throw new IOException(e); + throw new RuntimeException(e); } } @@ -161,8 +159,7 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, @Override public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException { - ByteBuffer rowKey = getRowKey(keyColumns); - Range<Token> range = ringCache.getRange(rowKey); + Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns)); // get the client for the given range, or create a new one RangeClient client = clients.get(range); @@ -174,7 +171,14 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, clients.put(range, client); } - client.put(Pair.create(rowKey, values)); + // add primary key columns to the bind variables + List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values); + for (String column : partitionKeyColumns) + allValues.add(keyColumns.get(column)); + for (String column : clusterColumns) + allValues.add(keyColumns.get(column)); + + client.put(allValues); progressable.progress(); } @@ -201,10 +205,10 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, outer: while (run || !queue.isEmpty()) { - Pair<ByteBuffer, List<ByteBuffer>> item; + List<ByteBuffer> bindVariables; try { - item = queue.take(); + bindVariables = queue.take(); } catch (InterruptedException e) { @@ -220,16 +224,15 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, { int i = 0; int itemId = preparedStatement(client); - while (item != null) + while (bindVariables != null) { - List<ByteBuffer> bindVariables = item.right; client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE); i++; if (i >= batchThreshold) break; - item = queue.poll(); + bindVariables = queue.poll(); } break; @@ -294,23 +297,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, } } - private ByteBuffer getRowKey(Map<String, ByteBuffer> keyColumns) + private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns) { - //current row key - ByteBuffer rowKey; + ByteBuffer partitionKey; if (keyValidator instanceof CompositeType) { - ByteBuffer[] keys = new ByteBuffer[partitionkeys.length]; + ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length]; for (int i = 0; i< keys.length; i++) - keys[i] = keyColumns.get(partitionkeys[i]); + keys[i] = keyColumns.get(partitionKeyColumns[i]); - rowKey = ((CompositeType) keyValidator).build(keys); + partitionKey = ((CompositeType) keyValidator).build(keys); } else { - rowKey = keyColumns.get(partitionkeys[0]); + partitionKey = keyColumns.get(partitionKeyColumns[0]); } - return rowKey; + return partitionKey; } /** retrieve the key validator from system.schema_columnfamilies table */ @@ -319,7 +321,8 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, String keyspace = ConfigHelper.getOutputKeyspace(conf); String cfName = ConfigHelper.getOutputColumnFamily(conf); String query = "SELECT key_validator," + - " key_aliases " + + " key_aliases," + + " column_aliases " + "FROM system.schema_columnfamilies " + "WHERE keyspace_name='%s' and columnfamily_name='%s'"; String formatted = String.format(query, keyspace, cfName); @@ -334,16 +337,22 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, logger.debug("partition keys: " + keyString); List<String> keys = FBUtilities.fromJsonList(keyString); - partitionkeys = new String[keys.size()]; + partitionKeyColumns = new String[keys.size()]; int i = 0; for (String key : keys) { - partitionkeys[i] = key; + partitionKeyColumns[i] = key; i++; } + + Column rawClusterColumns = result.rows.get(0).columns.get(2); + String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue())); + + logger.debug("cluster columns: " + clusterColumnString); + clusterColumns = FBUtilities.fromJsonList(clusterColumnString); } - private AbstractType<?> parseType(String type) throws IOException + private AbstractType<?> parseType(String type) throws ConfigurationException { try { @@ -352,32 +361,24 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, return LongType.instance; return TypeParser.parse(type); } - catch (ConfigurationException e) - { - throw new IOException(e); - } catch (SyntaxException e) { - throw new IOException(e); + throw new ConfigurationException(e.getMessage(), e); } } - - private String getAnyHost() throws IOException, InvalidRequestException, TException + + /** + * add where clauses for partition keys and cluster columns + */ + private String appendKeyWhereClauses(String cqlQuery) { - Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf); - List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf)); - try - { - for (TokenRange range : ring) - return range.endpoints.get(0); - } - finally - { - TTransport transport = client.getOutputProtocol().getTransport(); - if (transport.isOpen()) - transport.close(); - } - throw new IOException("There are no endpoints"); - } + String keyWhereClause = ""; + + for (String partitionKey : partitionKeyColumns) + keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? partitionKey : (" AND " + partitionKey)); + for (String clusterColumn : clusterColumns) + keyWhereClause += " AND " + clusterColumn + " = ?"; + return cqlQuery + " WHERE " + keyWhereClause; + } }
