Repository: cassandra Updated Branches: refs/heads/trunk 5b6154531 -> f698cc228
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 22921e2..51e5e3d 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -18,33 +18,27 @@ package org.apache.cassandra.tools; import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.MalformedURLException; import java.net.UnknownHostException; import java.util.*; -import com.google.common.base.Joiner; +import com.google.common.base.Optional; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.commons.cli.*; -import org.apache.cassandra.auth.PasswordAuthenticator; +import com.datastax.driver.core.SSLOptions; +import javax.net.ssl.SSLContext; import org.apache.cassandra.config.*; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.SSTableLoader; -import org.apache.cassandra.schema.LegacySchemaTables; +import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.streaming.*; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.NativeSSTableLoaderClient; import org.apache.cassandra.utils.OutputHandler; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TTransport; public class BulkLoader { @@ -54,7 +48,7 @@ public class BulkLoader private static final String NOPROGRESS_OPTION = "no-progress"; private static final String IGNORE_NODES_OPTION = "ignore"; private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes"; - private static final String RPC_PORT_OPTION = "port"; + private static final String NATIVE_PORT_OPTION = "port"; private static final String USER_OPTION = "username"; private static final String PASSWD_OPTION = "password"; private static final String THROTTLE_MBITS = "throttle"; @@ -82,13 +76,13 @@ public class BulkLoader options.directory, new ExternalClient( options.hosts, - options.rpcPort, + options.nativePort, options.user, options.passwd, - options.transportFactory, options.storagePort, options.sslStoragePort, - options.serverEncOptions), + options.serverEncOptions, + buildSSLOptions((EncryptionOptions.ClientEncryptionOptions)options.encOptions)), handler, options.connectionsPerHost); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle); @@ -154,8 +148,13 @@ public class BulkLoader start = lastTime = System.nanoTime(); } - public void onSuccess(StreamState finalState) {} - public void onFailure(Throwable t) {} + public void onSuccess(StreamState finalState) + { + } + + public void onFailure(Throwable t) + { + } public synchronized void handleStreamEvent(StreamEvent event) { @@ -254,14 +253,27 @@ public class BulkLoader } } - static class ExternalClient extends SSTableLoader.Client + private static SSLOptions buildSSLOptions(EncryptionOptions.ClientEncryptionOptions clientEncryptionOptions) + { + + if (!clientEncryptionOptions.enabled) + return null; + + SSLContext sslContext; + try + { + sslContext = SSLFactory.createSSLContext(clientEncryptionOptions, true); + } + catch (IOException e) + { + throw new RuntimeException("Could not create SSL Context.", e); + } + + return new SSLOptions(sslContext, clientEncryptionOptions.cipher_suites); + } + + static class ExternalClient extends NativeSSTableLoaderClient { - private final Map<String, CFMetaData> knownCfs = new HashMap<>(); - private final Set<InetAddress> hosts; - private final int rpcPort; - private final String user; - private final String passwd; - private final ITransportFactory transportFactory; private final int storagePort; private final int sslStoragePort; private final EncryptionOptions.ServerEncryptionOptions serverEncOptions; @@ -270,103 +282,22 @@ public class BulkLoader int port, String user, String passwd, - ITransportFactory transportFactory, int storagePort, int sslStoragePort, - EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions) + EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions, + SSLOptions sslOptions) { - super(); - this.hosts = hosts; - this.rpcPort = port; - this.user = user; - this.passwd = passwd; - this.transportFactory = transportFactory; + super(hosts, port, user, passwd, sslOptions); this.storagePort = storagePort; this.sslStoragePort = sslStoragePort; this.serverEncOptions = serverEncryptionOptions; } @Override - public void init(String keyspace) - { - Iterator<InetAddress> hostiter = hosts.iterator(); - while (hostiter.hasNext()) - { - try - { - // Query endpoint to ranges map and schemas from thrift - InetAddress host = hostiter.next(); - Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd, this.transportFactory); - - setPartitioner(client.describe_partitioner()); - Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); - - for (TokenRange tr : client.describe_ring(keyspace)) - { - Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); - for (String ep : tr.endpoints) - { - addRangeForEndpoint(range, InetAddress.getByName(ep)); - } - } - - String cfQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s'", - SystemKeyspace.NAME, - LegacySchemaTables.COLUMNFAMILIES, - keyspace); - CqlResult cfRes = client.execute_cql3_query(ByteBufferUtil.bytes(cfQuery), Compression.NONE, ConsistencyLevel.ONE); - - - for (CqlRow row : cfRes.rows) - { - String columnFamily = UTF8Type.instance.getString(row.columns.get(1).bufferForName()); - String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", - SystemKeyspace.NAME, - LegacySchemaTables.COLUMNS, - keyspace, - columnFamily); - CqlResult columnsRes = client.execute_cql3_query(ByteBufferUtil.bytes(columnsQuery), Compression.NONE, ConsistencyLevel.ONE); - - CFMetaData metadata = ThriftConversion.fromThriftCqlRow(row, columnsRes); - knownCfs.put(metadata.cfName, metadata); - } - break; - } - catch (Exception e) - { - if (!hostiter.hasNext()) - throw new RuntimeException("Could not retrieve endpoint ranges: ", e); - } - } - } - - @Override public StreamConnectionFactory getConnectionFactory() { return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false); } - - @Override - public CFMetaData getCFMetaData(String keyspace, String cfName) - { - return knownCfs.get(cfName); - } - - private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd, ITransportFactory transportFactory) throws Exception - { - TTransport trans = transportFactory.openTransport(host, port); - TProtocol protocol = new TBinaryProtocol(trans); - Cassandra.Client client = new Cassandra.Client(protocol); - if (user != null && passwd != null) - { - Map<String, String> credentials = new HashMap<>(); - credentials.put(PasswordAuthenticator.USERNAME_KEY, user); - credentials.put(PasswordAuthenticator.PASSWORD_KEY, passwd); - AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials); - client.login(authenticationRequest); - } - return client; - } } static class LoaderOptions @@ -376,13 +307,12 @@ public class BulkLoader public boolean debug; public boolean verbose; public boolean noProgress; - public int rpcPort = 9160; + public int nativePort = 9042; public String user; public String passwd; public int throttle = 0; public int storagePort; public int sslStoragePort; - public ITransportFactory transportFactory = new TFramedTransportFactory(); public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions(); public int connectionsPerHost = 1; public EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions(); @@ -438,8 +368,8 @@ public class BulkLoader opts.verbose = cmd.hasOption(VERBOSE_OPTION); opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION); - if (cmd.hasOption(RPC_PORT_OPTION)) - opts.rpcPort = Integer.parseInt(cmd.getOptionValue(RPC_PORT_OPTION)); + if (cmd.hasOption(NATIVE_PORT_OPTION)) + opts.nativePort = Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION)); if (cmd.hasOption(USER_OPTION)) opts.user = cmd.getOptionValue(USER_OPTION); @@ -558,13 +488,6 @@ public class BulkLoader opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(","); } - if (cmd.hasOption(TRANSPORT_FACTORY)) - { - ITransportFactory transportFactory = getTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY)); - configureTransportFactory(transportFactory, opts); - opts.transportFactory = transportFactory; - } - return opts; } catch (ParseException | ConfigurationException | MalformedURLException e) @@ -574,50 +497,6 @@ public class BulkLoader } } - private static ITransportFactory getTransportFactory(String transportFactory) - { - try - { - Class<?> factory = Class.forName(transportFactory); - if (!ITransportFactory.class.isAssignableFrom(factory)) - throw new IllegalArgumentException(String.format("transport factory '%s' " + - "not derived from ITransportFactory", transportFactory)); - return (ITransportFactory) factory.newInstance(); - } - catch (Exception e) - { - throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e); - } - } - - private static void configureTransportFactory(ITransportFactory transportFactory, LoaderOptions opts) - { - Map<String, String> options = new HashMap<>(); - // If the supplied factory supports the same set of options as our SSL impl, set those - if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE)) - options.put(SSLTransportFactory.TRUSTSTORE, opts.encOptions.truststore); - if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD)) - options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, opts.encOptions.truststore_password); - if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL)) - options.put(SSLTransportFactory.PROTOCOL, opts.encOptions.protocol); - if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES)) - options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(opts.encOptions.cipher_suites)); - - if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE) - && opts.encOptions.require_client_auth) - options.put(SSLTransportFactory.KEYSTORE, opts.encOptions.keystore); - if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD) - && opts.encOptions.require_client_auth) - options.put(SSLTransportFactory.KEYSTORE_PASSWORD, opts.encOptions.keystore_password); - - // Now check if any of the factory's supported options are set as system properties - for (String optionKey : transportFactory.supportedOptions()) - if (System.getProperty(optionKey) != null) - options.put(optionKey, System.getProperty(optionKey)); - - transportFactory.setOptions(options); - } - private static void errorMsg(String msg, CmdLineOptions options) { System.err.println(msg); @@ -633,7 +512,7 @@ public class BulkLoader options.addOption(null, NOPROGRESS_OPTION, "don't display progress"); options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes"); options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required. try to connect to these hosts (comma separated) initially for ring information"); - options.addOption("p", RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)"); + options.addOption("p", NATIVE_PORT_OPTION, "rpc port", "port used for native connection (default 9042)"); options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)"); options.addOption("u", USER_OPTION, "username", "username for cassandra authentication"); options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java new file mode 100644 index 0000000..1ef686c --- /dev/null +++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.net.InetAddress; +import java.util.*; + +import com.datastax.driver.core.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyType; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.db.composites.CellNames; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.schema.LegacySchemaTables; + +public class NativeSSTableLoaderClient extends SSTableLoader.Client +{ + protected final Map<String, CFMetaData> tables; + private final Collection<InetAddress> hosts; + private final int port; + private final String username; + private final String password; + private final SSLOptions sslOptions; + + public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, String username, String password, SSLOptions sslOptions) + { + super(); + this.tables = new HashMap<>(); + this.hosts = hosts; + this.port = port; + this.username = username; + this.password = password; + this.sslOptions = sslOptions; + } + + public void init(String keyspace) + { + Cluster.Builder builder = Cluster.builder().addContactPoints(hosts).withPort(port); + if (sslOptions != null) + builder.withSSL(sslOptions); + if (username != null && password != null) + builder = builder.withCredentials(username, password); + + try (Cluster cluster = builder.build()) + { + Session session = cluster.connect(); + Metadata metadata = cluster.getMetadata(); + + setPartitioner(metadata.getPartitioner()); + + Set<TokenRange> tokenRanges = metadata.getTokenRanges(); + + Token.TokenFactory tokenFactory = getPartitioner().getTokenFactory(); + + for (TokenRange tokenRange : tokenRanges) + { + Set<Host> endpoints = metadata.getReplicas(keyspace, tokenRange); + Range<Token> range = new Range<>(tokenFactory.fromString(tokenRange.getStart().getValue().toString()), + tokenFactory.fromString(tokenRange.getEnd().getValue().toString())); + for (Host endpoint : endpoints) + addRangeForEndpoint(range, endpoint.getAddress()); + } + + tables.putAll(fetchTablesMetadata(keyspace, session)); + } + } + + public CFMetaData getTableMetadata(String tableName) + { + return tables.get(tableName); + } + + @Override + public void setTableMetadata(CFMetaData cfm) + { + tables.put(cfm.cfName, cfm); + } + + private static Map<String, CFMetaData> fetchTablesMetadata(String keyspace, Session session) + { + Map<String, CFMetaData> tables = new HashMap<>(); + + String query = String.format("SELECT columnfamily_name, cf_id, type, comparator, subcomparator FROM %s.%s WHERE keyspace_name = '%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNFAMILIES, + keyspace); + + for (Row row : session.execute(query)) + { + String name = row.getString("columnfamily_name"); + UUID id = row.getUUID("cf_id"); + ColumnFamilyType type = ColumnFamilyType.valueOf(row.getString("type")); + AbstractType rawComparator = TypeParser.parse(row.getString("comparator")); + AbstractType subComparator = row.isNull("subcomparator") + ? null + : TypeParser.parse(row.getString("subcomparator")); + boolean isDense = row.getBool("is_dense"); + CellNameType comparator = CellNames.fromAbstractType(CFMetaData.makeRawAbstractType(rawComparator, subComparator), + isDense); + + tables.put(name, new CFMetaData(keyspace, name, type, comparator, id)); + } + + return tables; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/pig/org/apache/cassandra/pig/CqlTableTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java index 26f9f68..72fdd5a 100644 --- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java +++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.Iterator; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.thrift.TException; import org.junit.Assert; @@ -70,6 +69,11 @@ public class CqlTableTest extends PigTestBase "UPDATE collectiontable SET n['key2'] = 'value2' WHERE m = 'book2';", "UPDATE collectiontable SET n['key3'] = 'value3' WHERE m = 'book3';", "UPDATE collectiontable SET n['key4'] = 'value4' WHERE m = 'book4';", + "CREATE TABLE nulltable(m text PRIMARY KEY, n map<text, text>);", + "UPDATE nulltable SET n['key1'] = 'value1' WHERE m = 'book1';", + "UPDATE nulltable SET n['key2'] = 'value2' WHERE m = 'book2';", + "UPDATE nulltable SET n['key3'] = 'value3' WHERE m = 'book3';", + "UPDATE nulltable SET n['key4'] = 'value4' WHERE m = 'book4';", }; @BeforeClass @@ -229,65 +233,32 @@ public class CqlTableTest extends PigTestBase } @Test - public void testCassandraStorageSchema() throws IOException + public void testCqlNativeStorageNullTuples() throws IOException { - //results: (key1,{((111,),),((111,column1),100),((111,column2),10.1)}) - pig.registerQuery("rows = LOAD 'cassandra://cql3ks/cqltable?" + defaultParameters + "' USING CassandraStorage();"); - - //schema: {key: chararray,columns: {(name: (),value: bytearray)}} - Iterator<Tuple> it = pig.openIterator("rows"); - if (it.hasNext()) { - Tuple t = it.next(); - String rowKey = t.get(0).toString(); - Assert.assertEquals(rowKey, "key1"); - DataBag columns = (DataBag) t.get(1); - Iterator<Tuple> iter = columns.iterator(); - int i = 0; - while (iter.hasNext()) - { - i++; - Tuple column = iter.next(); - if (i==1) - { - Assert.assertEquals(((Tuple) column.get(0)).get(0), 111); - Assert.assertEquals(((Tuple) column.get(0)).get(1), ""); - Assert.assertEquals(column.get(1).toString(), ""); - } - if (i==2) - { - Assert.assertEquals(((Tuple) column.get(0)).get(0), 111); - Assert.assertEquals(((Tuple) column.get(0)).get(1), "column1"); - Assert.assertEquals(column.get(1), 100); - } - if (i==3) - { - Assert.assertEquals(((Tuple) column.get(0)).get(0), 111); - Assert.assertEquals(((Tuple) column.get(0)).get(1), "column2"); - Assert.assertEquals(column.get(1), 10.1f); - } - } - Assert.assertEquals(3, columns.size()); - } - else - { - Assert.fail("Can't fetch any data"); - } + //input_cql=select * from collectiontable where token(m) > ? and token(m) <= ? + NullTupleTest("nulltable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20nulltable%20where%20token(m)%20%3E%20%3F%20and%20token(m)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + } - //results: (key1,(column1,100),(column2,10.1)) - pig.registerQuery("compact_rows = LOAD 'cassandra://cql3ks/compactcqltable?" + defaultParameters + "' USING CassandraStorage();"); + private void NullTupleTest(String initialQuery) throws IOException + { + pig.setBatchOn(); + pig.registerQuery(initialQuery); + pig.registerQuery("recs= FOREACH nulltable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', null), TOTUPLE('n', null)));"); + pig.registerQuery("STORE recs INTO 'cql://cql3ks/nulltable?" + defaultParameters + nativeParameters + "&output_query=update+cql3ks.nulltable+set+n+%3D+%3F' USING CqlNativeStorage();"); + pig.executeBatch(); - //schema: {key: chararray,column1: (name: chararray,value: int),column2: (name: chararray,value: float)} - it = pig.openIterator("compact_rows"); + pig.registerQuery("result= LOAD 'cql://cql3ks/nulltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20nulltable%20where%20token(m)%20%3E%20%3F%20and%20token(m)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + Iterator<Tuple> it = pig.openIterator("result"); if (it.hasNext()) { Tuple t = it.next(); - String rowKey = t.get(0).toString(); - Assert.assertEquals(rowKey, "key1"); - Tuple column = (Tuple) t.get(1); - Assert.assertEquals(column.get(0), "column1"); - Assert.assertEquals(column.get(1), 100); - column = (Tuple) t.get(2); - Assert.assertEquals(column.get(0), "column2"); - Assert.assertEquals(column.get(1), 10.1f); + Tuple t1 = (Tuple) t.get(1); + Assert.assertEquals(t1.size(), 2); + Tuple element1 = (Tuple) t1.get(0); + Tuple element2 = (Tuple) t1.get(1); + Assert.assertEquals(element1.get(0), "m"); + Assert.assertEquals(element1.get(1), ""); + Assert.assertEquals(element2.get(0), "n"); + Assert.assertEquals(element2.get(1), ""); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index 850f46d..6525527 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -92,16 +92,19 @@ public class CQLSSTableWriterTest SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() { + private String keyspace; + public void init(String keyspace) { + this.keyspace = keyspace; for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); setPartitioner(StorageService.getPartitioner()); } - public CFMetaData getCFMetaData(String keyspace, String cfName) + public CFMetaData getTableMetadata(String tableName) { - return Schema.instance.getCFMetaData(keyspace, cfName); + return Schema.instance.getCFMetaData(keyspace, tableName); } }, new OutputHandler.SystemOutput(false, false)); @@ -251,16 +254,19 @@ public class CQLSSTableWriterTest SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() { + private String keyspace; + public void init(String keyspace) { + this.keyspace = keyspace; for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); setPartitioner(StorageService.getPartitioner()); } - public CFMetaData getCFMetaData(String keyspace, String cfName) + public CFMetaData getTableMetadata(String tableName) { - return Schema.instance.getCFMetaData(keyspace, cfName); + return Schema.instance.getCFMetaData(keyspace, tableName); } }, new OutputHandler.SystemOutput(false, false)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java index b245994..4a51fbd 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java @@ -83,16 +83,19 @@ public class SSTableLoaderTest SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() { + private String keyspace; + public void init(String keyspace) { + this.keyspace = keyspace; for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1)) addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); setPartitioner(StorageService.getPartitioner()); } - public CFMetaData getCFMetaData(String keyspace, String cfName) + public CFMetaData getTableMetadata(String tableName) { - return Schema.instance.getCFMetaData(keyspace, cfName); + return Schema.instance.getCFMetaData(keyspace, tableName); } }, new OutputHandler.SystemOutput(false, false));
