Updated Branches: refs/heads/trunk f02928f88 -> a58b87020
fix BulkLoader recognition of CQL3 columnfamilies patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4755 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a58b8702 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a58b8702 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a58b8702 Branch: refs/heads/trunk Commit: a58b87020b4315eab48482666450049fb7cf0674 Parents: f02928f Author: Jonathan Ellis <[email protected]> Authored: Mon Oct 22 11:48:07 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Mon Oct 22 11:48:07 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/tools/BulkLoader.java | 32 +++++++------- 2 files changed, 17 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a58b8702/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e14ffa7..4026428 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-beta2 + * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755) * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793) * Make hint delivery asynchronous (CASSANDRA-4761) * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a58b8702/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index f2f018f..c838cb0 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -23,15 +23,19 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import org.apache.commons.cli.*; + import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.SystemTable; +import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.streaming.PendingFile; import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.OutputHandler; -import org.apache.commons.cli.*; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; @@ -162,7 +166,7 @@ public class BulkLoader sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append(" - "); sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s"); - sb.append(" (avg: ").append(mbPerSec(totalProgress, time - startTime)).append("MB/s)]");; + sb.append(" (avg: ").append(mbPerSec(totalProgress, time - startTime)).append("MB/s)]"); System.out.print(sb.toString()); return done; } @@ -176,7 +180,7 @@ public class BulkLoader static class ExternalClient extends SSTableLoader.Client { - private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>(); + private final Set<String> knownCfs = new HashSet<String>(); private final Set<InetAddress> hosts; private final int rpcPort; private final String user; @@ -198,17 +202,14 @@ public class BulkLoader { try { - // Query endpoint to ranges map and schemas from thrift InetAddress host = hostiter.next(); Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd); - List<TokenRange> tokenRanges = client.describe_ring(keyspace); - List<KsDef> ksDefs = client.describe_keyspaces(); setPartitioner(client.describe_partitioner()); Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); - for (TokenRange tr : tokenRanges) + for (TokenRange tr : client.describe_ring(keyspace)) { Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); for (String ep : tr.endpoints) @@ -217,13 +218,13 @@ public class BulkLoader } } - for (KsDef ksDef : ksDefs) - { - Set<String> cfs = new HashSet<String>(); - for (CfDef cfDef : ksDef.cf_defs) - cfs.add(cfDef.name); - knownCfs.put(ksDef.name, cfs); - } + String query = String.format("SELECT columnfamily_name FROM %s.%s WHERE keyspace_name = '%s'", + Table.SYSTEM_KS, + SystemTable.SCHEMA_COLUMNFAMILIES_CF, + keyspace); + CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); + for (CqlRow row : result.rows) + knownCfs.add(new String(row.getColumns().get(0).getValue(), "UTF8")); break; } catch (Exception e) @@ -236,8 +237,7 @@ public class BulkLoader public boolean validateColumnFamily(String keyspace, String cfName) { - Set<String> cfs = knownCfs.get(keyspace); - return cfs != null && cfs.contains(cfName); + return knownCfs.contains(cfName); } private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd) throws Exception
