Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 ad6ba3d24 -> 563cea14b refs/heads/cassandra-2.1 54fbb0abb -> de5bb5854 refs/heads/trunk fe8829fa6 -> ff88f2376
Fix sstableloader unable to connect encrypted node patch by yukim; reviewed by krummas for CASSANDRA-7585 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/563cea14 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/563cea14 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/563cea14 Branch: refs/heads/cassandra-2.0 Commit: 563cea14b4bb87cd37ab10399904f08757c34d27 Parents: ad6ba3d Author: Yuki Morishita <yu...@apache.org> Authored: Fri Aug 15 12:31:59 2014 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Fri Aug 15 12:31:59 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../config/YamlConfigurationLoader.java | 6 +- .../cassandra/io/sstable/SSTableLoader.java | 22 ++- .../cassandra/streaming/ConnectionHandler.java | 48 +----- .../streaming/DefaultConnectionFactory.java | 74 +++++++++ .../streaming/StreamConnectionFactory.java | 30 ++++ .../apache/cassandra/streaming/StreamPlan.java | 16 +- .../cassandra/streaming/StreamResultFuture.java | 2 +- .../cassandra/streaming/StreamSession.java | 13 +- .../tools/BulkLoadConnectionFactory.java | 68 +++++++++ .../org/apache/cassandra/tools/BulkLoader.java | 149 +++++++++++++------ .../streaming/StreamTransferTaskTest.java | 2 +- 12 files changed, 330 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4306de5..e335484 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -47,6 +47,7 @@ * Backport CASSANDRA-6747 (CASSANDRA-7560) * Track max/min timestamps for range tombstones (CASSANDRA-7647) * Fix NPE when listing saved caches dir (CASSANDRA-7632) + * Fix sstableloader unable to connect encrypted node (CASSANDRA-7585) Merged from 1.2: * Remove duplicates from StorageService.getJoiningNodes (CASSANDRA-7478) * Clone token map outside of hot gossip loops (CASSANDRA-7758) http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java index 6b5a152..b520d07 100644 --- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java +++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java @@ -69,10 +69,14 @@ public class YamlConfigurationLoader implements ConfigurationLoader public Config loadConfig() throws ConfigurationException { + return loadConfig(getStorageConfigURL()); + } + + public Config loadConfig(URL url) throws ConfigurationException + { InputStream input = null; try { - URL url = getStorageConfigURL(); logger.info("Loading settings from {}", url); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 4a1604d..85dc0e4 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -50,7 +50,7 @@ public class SSTableLoader implements StreamEventHandler private final OutputHandler outputHandler; private final Set<InetAddress> failedHosts = new HashSet<>(); - private final List<SSTableReader> sstables = new ArrayList<SSTableReader>(); + private final List<SSTableReader> sstables = new ArrayList<>(); private final Multimap<InetAddress, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create(); static @@ -94,7 +94,7 @@ public class SSTableLoader implements StreamEventHandler return false; } - Set<Component> components = new HashSet<Component>(); + Set<Component> components = new HashSet<>(); components.add(Component.DATA); components.add(Component.PRIMARY_INDEX); if (new File(desc.filenameFor(Component.SUMMARY)).exists()) @@ -149,7 +149,7 @@ public class SSTableLoader implements StreamEventHandler client.init(keyspace); outputHandler.output("Established connection to initial hosts"); - StreamPlan plan = new StreamPlan("Bulk Load"); + StreamPlan plan = new StreamPlan("Bulk Load").connectionFactory(client.getConnectionFactory()); Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); @@ -220,7 +220,7 @@ public class SSTableLoader implements StreamEventHandler public static abstract class Client { - private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<InetAddress, Collection<Range<Token>>>(); + private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>(); private IPartitioner partitioner; /** @@ -240,6 +240,17 @@ public class SSTableLoader implements StreamEventHandler public void stop() {} /** + * Provides connection factory. + * By default, it uses DefaultConnectionFactory. + * + * @return StreamConnectionFactory to use + */ + public StreamConnectionFactory getConnectionFactory() + { + return new DefaultConnectionFactory(); + } + + /** * Validate that {@code keyspace} is an existing keyspace and {@code * cfName} one of its existing column family. */ @@ -258,6 +269,7 @@ public class SSTableLoader implements StreamEventHandler protected void setPartitioner(IPartitioner partitioner) { this.partitioner = partitioner; + // the following is still necessary since Range/Token reference partitioner through StorageService.getPartitioner DatabaseDescriptor.setPartitioner(partitioner); } @@ -271,7 +283,7 @@ public class SSTableLoader implements StreamEventHandler Collection<Range<Token>> ranges = endpointToRanges.get(endpoint); if (ranges == null) { - ranges = new HashSet<Range<Token>>(); + ranges = new HashSet<>(); endpointToRanges.put(endpoint, ranges); } ranges.add(range); http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index b06a818..8fba41b 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -18,7 +18,6 @@ package org.apache.cassandra.streaming; import java.io.IOException; -import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; @@ -37,8 +36,6 @@ import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.net.OutboundTcpConnectionPool; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; import org.apache.cassandra.utils.FBUtilities; @@ -55,8 +52,6 @@ public class ConnectionHandler { private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class); - private static final int MAX_CONNECT_ATTEMPTS = 3; - private final StreamSession session; private IncomingMessageHandler incoming; @@ -79,12 +74,12 @@ public class ConnectionHandler public void initiate() throws IOException { logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId()); - Socket incomingSocket = connect(session.peer); + Socket incomingSocket = session.createConnection(); incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION); incoming.sendInitMessage(incomingSocket, true); logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId()); - Socket outgoingSocket = connect(session.peer); + Socket outgoingSocket = session.createConnection(); outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION); outgoing.sendInitMessage(outgoingSocket, false); } @@ -104,45 +99,6 @@ public class ConnectionHandler incoming.start(socket, version); } - /** - * Connect to peer and start exchanging message. - * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times. - * - * @param peer the peer to connect to. - * @return the created socket. - * - * @throws IOException when connection failed. - */ - private static Socket connect(InetAddress peer) throws IOException - { - int attempts = 0; - while (true) - { - try - { - Socket socket = OutboundTcpConnectionPool.newSocket(peer); - socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); - return socket; - } - catch (IOException e) - { - if (++attempts >= MAX_CONNECT_ATTEMPTS) - throw e; - - long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts); - logger.warn("Failed attempt " + attempts + " to connect to " + peer + ". Retrying in " + waitms + " ms. (" + e + ")"); - try - { - Thread.sleep(waitms); - } - catch (InterruptedException wtf) - { - throw new IOException("interrupted", wtf); - } - } - } - } - public ListenableFuture<?> close() { logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java new file mode 100644 index 0000000..53af4c8 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.OutboundTcpConnectionPool; + +public class DefaultConnectionFactory implements StreamConnectionFactory +{ + private static final Logger logger = LoggerFactory.getLogger(DefaultConnectionFactory.class); + + private static final int MAX_CONNECT_ATTEMPTS = 3; + + /** + * Connect to peer and start exchanging message. + * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times. + * + * @param peer the peer to connect to. + * @return the created socket. + * + * @throws IOException when connection failed. + */ + public Socket createConnection(InetAddress peer) throws IOException + { + int attempts = 0; + while (true) + { + try + { + Socket socket = OutboundTcpConnectionPool.newSocket(peer); + socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); + return socket; + } + catch (IOException e) + { + if (++attempts >= MAX_CONNECT_ATTEMPTS) + throw e; + + long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts); + logger.warn("Failed attempt " + attempts + " to connect to " + peer + ". Retrying in " + waitms + " ms. (" + e + ")"); + try + { + Thread.sleep(waitms); + } + catch (InterruptedException wtf) + { + throw new IOException("interrupted", wtf); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java new file mode 100644 index 0000000..dd99611 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; + +/** + * Interface that creates connection used by streaming. + */ +public interface StreamConnectionFactory +{ + Socket createConnection(InetAddress peer) throws IOException; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index b57e097..e582c79 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -38,6 +38,8 @@ public class StreamPlan // sessions per InetAddress of the other end. private final Map<InetAddress, StreamSession> sessions = new HashMap<>(); + private StreamConnectionFactory connectionFactory = new DefaultConnectionFactory(); + private boolean flushBeforeTransfer = true; /** @@ -132,6 +134,18 @@ public class StreamPlan } /** + * Set custom StreamConnectionFactory to be used for establishing connection + * + * @param factory StreamConnectionFactory to use + * @return self + */ + public StreamPlan connectionFactory(StreamConnectionFactory factory) + { + this.connectionFactory = factory; + return this; + } + + /** * @return true if this plan has no plan to execute */ public boolean isEmpty() @@ -167,7 +181,7 @@ public class StreamPlan StreamSession session = sessions.get(peer); if (session == null) { - session = new StreamSession(peer); + session = new StreamSession(peer, connectionFactory); sessions.put(peer, session); } return session; http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index dcffaff..add14f7 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -106,7 +106,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) { - final StreamSession session = new StreamSession(from); + final StreamSession session = new StreamSession(from, null); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. future = new StreamResultFuture(planId, description, Collections.singleton(session)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 55e30f0..4fcbe36 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -19,6 +19,7 @@ package org.apache.cassandra.streaming; import java.io.IOException; import java.net.InetAddress; +import java.net.Socket; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -128,6 +129,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe // data receivers, filled after receiving prepare message private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>(); private final StreamingMetrics metrics; + /* can be null when session is created in remote */ + private final StreamConnectionFactory factory; public final ConnectionHandler handler; @@ -152,10 +155,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe * Create new streaming session with the peer. * * @param peer Address of streaming peer + * @param factory is used for establishing connection */ - public StreamSession(InetAddress peer) + public StreamSession(InetAddress peer, StreamConnectionFactory factory) { this.peer = peer; + this.factory = factory; this.handler = new ConnectionHandler(this); this.metrics = StreamingMetrics.get(peer); } @@ -211,6 +216,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe }); } + public Socket createConnection() throws IOException + { + assert factory != null; + return factory.createConnection(peer); + } + /** * Request data fetch task to this session. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java new file mode 100644 index 0000000..399344e --- /dev/null +++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.channels.SocketChannel; +import java.util.HashSet; +import java.util.Set; + +import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.streaming.StreamConnectionFactory; +import org.apache.cassandra.utils.FBUtilities; + +public class BulkLoadConnectionFactory implements StreamConnectionFactory +{ + private final boolean outboundBindAny; + private final int storagePort; + private final int secureStoragePort; + private final EncryptionOptions.ServerEncryptionOptions encryptionOptions; + + public BulkLoadConnectionFactory(int storagePort, int secureStoragePort, EncryptionOptions.ServerEncryptionOptions encryptionOptions, boolean outboundBindAny) + { + this.storagePort = storagePort; + this.secureStoragePort = secureStoragePort; + this.encryptionOptions = encryptionOptions; + this.outboundBindAny = outboundBindAny; + } + + public Socket createConnection(InetAddress peer) throws IOException + { + // Connect to secure port for all peers if ServerEncryptionOptions is configured other than 'none' + // When 'all', 'dc' and 'rack', server nodes always have SSL port open, and since thin client like sstableloader + // does not know which node is in which dc/rack, connecting to SSL port is always the option. + if (encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none) + { + if (outboundBindAny) + return SSLFactory.getSocket(encryptionOptions, peer, secureStoragePort); + else + return SSLFactory.getSocket(encryptionOptions, peer, secureStoragePort, FBUtilities.getLocalAddress(), 0); + } + else + { + Socket socket = SocketChannel.open(new InetSocketAddress(peer, storagePort)).socket(); + if (outboundBindAny && !socket.isBound()) + socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0)); + return socket; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 37ec635..4077722 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -18,29 +18,27 @@ package org.apache.cassandra.tools; import java.io.File; -import java.net.InetAddress; -import java.net.UnknownHostException; +import java.net.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import com.google.common.base.Joiner; import com.google.common.collect.Sets; -import org.apache.cassandra.config.EncryptionOptions; + +import org.apache.cassandra.config.*; + import org.apache.commons.cli.*; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.cassandra.auth.IAuthenticator; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.streaming.*; import org.apache.cassandra.thrift.*; @@ -60,7 +58,10 @@ public class BulkLoader private static final String USER_OPTION = "username"; private static final String PASSWD_OPTION = "password"; private static final String THROTTLE_MBITS = "throttle"; + private static final String TRANSPORT_FACTORY = "transport-factory"; + + /* client encryption options */ private static final String SSL_TRUSTSTORE = "truststore"; private static final String SSL_TRUSTSTORE_PW = "truststore-password"; private static final String SSL_KEYSTORE = "keystore"; @@ -69,12 +70,20 @@ public class BulkLoader private static final String SSL_ALGORITHM = "ssl-alg"; private static final String SSL_STORE_TYPE = "store-type"; private static final String SSL_CIPHER_SUITES = "ssl-ciphers"; + private static final String CONFIG_PATH = "conf-path"; public static void main(String args[]) { LoaderOptions options = LoaderOptions.parseArgs(args); OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); - SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory), handler); + SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, + options.rpcPort, + options.user, + options.passwd, + options.transportFactory, + options.storagePort, + options.sslStoragePort, + options.serverEncOptions), handler); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle); StreamResultFuture future = null; try @@ -207,8 +216,18 @@ public class BulkLoader private final String user; private final String passwd; private final ITransportFactory transportFactory; - - public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd, ITransportFactory transportFactory) + private final int storagePort; + private final int sslStoragePort; + private final EncryptionOptions.ServerEncryptionOptions serverEncOptions; + + public ExternalClient(Set<InetAddress> hosts, + int port, + String user, + String passwd, + ITransportFactory transportFactory, + int storagePort, + int sslStoragePort, + EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions) { super(); this.hosts = hosts; @@ -216,8 +235,12 @@ public class BulkLoader this.user = user; this.passwd = passwd; this.transportFactory = transportFactory; + this.storagePort = storagePort; + this.sslStoragePort = sslStoragePort; + this.serverEncOptions = serverEncryptionOptions; } + @Override public void init(String keyspace) { Iterator<InetAddress> hostiter = hosts.iterator(); @@ -234,7 +257,7 @@ public class BulkLoader for (TokenRange tr : client.describe_ring(keyspace)) { - Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); + Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token), getPartitioner()); for (String ep : tr.endpoints) { addRangeForEndpoint(range, InetAddress.getByName(ep)); @@ -261,6 +284,13 @@ public class BulkLoader } } + @Override + public StreamConnectionFactory getConnectionFactory() + { + return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false); + } + + @Override public CFMetaData getCFMetaData(String keyspace, String cfName) { return knownCfs.get(cfName); @@ -273,7 +303,7 @@ public class BulkLoader Cassandra.Client client = new Cassandra.Client(protocol); if (user != null && passwd != null) { - Map<String, String> credentials = new HashMap<String, String>(); + Map<String, String> credentials = new HashMap<>(); credentials.put(IAuthenticator.USERNAME_KEY, user); credentials.put(IAuthenticator.PASSWORD_KEY, passwd); AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials); @@ -294,11 +324,14 @@ public class BulkLoader public String user; public String passwd; public int throttle = 0; + public int storagePort; + public int sslStoragePort; public ITransportFactory transportFactory = new TFramedTransportFactory(); public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions(); + public EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions(); - public final Set<InetAddress> hosts = new HashSet<InetAddress>(); - public final Set<InetAddress> ignores = new HashSet<InetAddress>(); + public final Set<InetAddress> hosts = new HashSet<>(); + public final Set<InetAddress> ignores = new HashSet<>(); LoaderOptions(File directory) { @@ -349,9 +382,6 @@ public class BulkLoader opts.verbose = cmd.hasOption(VERBOSE_OPTION); opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION); - if (cmd.hasOption(THROTTLE_MBITS)) - opts.throttle = Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS)); - if (cmd.hasOption(RPC_PORT_OPTION)) opts.rpcPort = Integer.parseInt(cmd.getOptionValue(RPC_PORT_OPTION)); @@ -400,44 +430,71 @@ public class BulkLoader } } - if(cmd.hasOption(SSL_TRUSTSTORE)) + // try to load config file first, so that values can be rewritten with other option values. + // otherwise use default config. + Config config; + if (cmd.hasOption(CONFIG_PATH)) + { + File configFile = new File(cmd.getOptionValue(CONFIG_PATH)); + if (!configFile.exists()) + { + errorMsg("Config file not found", options); + } + config = new YamlConfigurationLoader().loadConfig(configFile.toURI().toURL()); + } + else + { + config = new Config(); + } + opts.storagePort = config.storage_port; + opts.sslStoragePort = config.ssl_storage_port; + opts.throttle = config.stream_throughput_outbound_megabits_per_sec; + opts.encOptions = config.client_encryption_options; + opts.serverEncOptions = config.server_encryption_options; + + if (cmd.hasOption(THROTTLE_MBITS)) + { + opts.throttle = Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS)); + } + + if (cmd.hasOption(SSL_TRUSTSTORE)) { opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE); } - if(cmd.hasOption(SSL_TRUSTSTORE_PW)) + if (cmd.hasOption(SSL_TRUSTSTORE_PW)) { opts.encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW); } - if(cmd.hasOption(SSL_KEYSTORE)) + if (cmd.hasOption(SSL_KEYSTORE)) { opts.encOptions.keystore = cmd.getOptionValue(SSL_KEYSTORE); // if a keystore was provided, lets assume we'll need to use it opts.encOptions.require_client_auth = true; } - if(cmd.hasOption(SSL_KEYSTORE_PW)) + if (cmd.hasOption(SSL_KEYSTORE_PW)) { opts.encOptions.keystore_password = cmd.getOptionValue(SSL_KEYSTORE_PW); } - if(cmd.hasOption(SSL_PROTOCOL)) + if (cmd.hasOption(SSL_PROTOCOL)) { opts.encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL); } - if(cmd.hasOption(SSL_ALGORITHM)) + if (cmd.hasOption(SSL_ALGORITHM)) { opts.encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM); } - if(cmd.hasOption(SSL_STORE_TYPE)) + if (cmd.hasOption(SSL_STORE_TYPE)) { opts.encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE); } - if(cmd.hasOption(SSL_CIPHER_SUITES)) + if (cmd.hasOption(SSL_CIPHER_SUITES)) { opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(","); } @@ -451,7 +508,7 @@ public class BulkLoader return opts; } - catch (ParseException e) + catch (ParseException | ConfigurationException | MalformedURLException e) { errorMsg(e.getMessage(), options); return null; @@ -508,6 +565,7 @@ public class BulkLoader printUsage(options); System.exit(1); } + private static CmdLineOptions getCmdLineOptions() { CmdLineOptions options = new CmdLineOptions(); @@ -516,37 +574,38 @@ public class BulkLoader options.addOption("h", HELP_OPTION, "display this help message"); options.addOption(null, NOPROGRESS_OPTION, "don't display progress"); options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes"); - options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "try to connect to these hosts (comma separated) initially for ring information"); + options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required. try to connect to these hosts (comma separated) initially for ring information"); options.addOption("p", RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)"); options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)"); options.addOption("u", USER_OPTION, "username", "username for cassandra authentication"); options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication"); options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra"); // ssl connection-related options - options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore"); - options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password of the truststore"); - options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "SSL: full path to keystore"); - options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "SSL: password of the keystore"); - options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "SSL: connections protocol to use (default: TLS)"); - options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "SSL: algorithm (default: SunX509)"); - options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "SSL: type of store"); - options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "SSL: comma-separated list of encryption suites to use"); + options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "Client SSL: full path to truststore"); + options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "Client SSL: password of the truststore"); + options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "Client SSL: full path to keystore"); + options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "Client SSL: password of the keystore"); + options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "Client SSL: connections protocol to use (default: TLS)"); + options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "Client SSL: algorithm (default: SunX509)"); + options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: type of store"); + options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use"); + options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL."); return options; } public static void printUsage(Options options) { String usage = String.format("%s [options] <dir_path>", TOOL_NAME); - StringBuilder header = new StringBuilder(); - header.append("--\n"); - header.append("Bulk load the sstables found in the directory <dir_path> to the configured cluster." ); - header.append("The parent directory of <dir_path> is used as the keyspace name. "); - header.append("So for instance, to load an sstable named Standard1-g-1-Data.db into keyspace Keyspace1, "); - header.append("you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db in a "); - header.append("directory Keyspace1/Standard1/ in the directory and call: sstableloader Keyspace1/Standard1"); - header.append("\n--\n"); - header.append("Options are:"); - new HelpFormatter().printHelp(usage, header.toString(), options, ""); + String header = System.lineSeparator() + + "Bulk load the sstables found in the directory <dir_path> to the configured cluster." + + "The parent directories of <dir_path> are used as the target keyspace/table name. " + + "So for instance, to load an sstable named Standard1-g-1-Data.db into Keyspace1/Standard1, " + + "you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db into a directory /path/to/Keyspace1/Standard1/."; + String footer = System.lineSeparator() + + "You can provide cassandra.yaml file with -f command line option to set up streaming throughput, client and server encryption options. " + + "Only stream_throughput_outbound_megabits_per_sec, server_encryption_options and client_encryption_options are read from yaml. " + + "You can override options read from cassandra.yaml with corresponding command line options."; + new HelpFormatter().printHelp(usage, header, options, footer); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 9b02817..ce0f9d0 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -43,7 +43,7 @@ public class StreamTransferTaskTest extends SchemaLoader String ks = "Keyspace1"; String cf = "Standard1"; - StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress()); + StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null); ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf); // create two sstables