Repository: cassandra Updated Branches: refs/heads/trunk ef54c6c72 -> 1944e402c
(Pig) support BulkOutputFormat as a URL parameter patch by Alex Liu; reviewed by Piotr KoÅaczkowski for CASSANDRA-7410 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7b40735 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7b40735 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7b40735 Branch: refs/heads/trunk Commit: c7b40735789c840529002eb3c11d8731f460d61c Parents: ae51086 Author: Alex Liu <[email protected]> Authored: Tue Sep 15 16:06:18 2015 +0100 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue Sep 15 16:08:54 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/cql3/CqlBulkOutputFormat.java | 93 +++++++- .../hadoop/cql3/CqlBulkRecordWriter.java | 87 ++++---- .../cassandra/hadoop/pig/CqlNativeStorage.java | 213 +++++++++++++------ .../apache/cassandra/hadoop/pig/CqlStorage.java | 1 - .../org/apache/cassandra/tools/BulkLoader.java | 2 +- test/conf/cassandra.yaml | 1 + .../org/apache/cassandra/pig/CqlTableTest.java | 36 ++++ .../org/apache/cassandra/pig/PigTestBase.java | 3 +- 9 files changed, 336 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dff47fc..5f11049 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.10 + * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410) * BATCH statement is broken in cqlsh (CASSANDRA-10272) * Added configurable warning threshold for GC duration (CASSANDRA-8907) * (cqlsh) Make cqlsh PEP8 compliant (CASSANDRA-10066) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java index 887fe8e..7fedb41 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.hadoop.AbstractBulkOutputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.hadoop.conf.Configuration; @@ -54,6 +55,16 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema."; private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert."; private static final String DELETE_SOURCE = "cassandra.output.delete.source"; + private static final String OUTPUT_CQL_STORAGE_PORT = "cassandra.storage.port"; + private static final String OUTPUT_CQL_SSL_STORAGE_PORT = "cassandra.ssl.storage.port"; + private static final String INTERNODE_ENCRYPTION = "cassandra.internode.encryption"; + private static final String SERVER_KEYSTORE = "cassandra.server.keystore"; + private static final String SERVER_KEYSTORE_PASSWORD = "cassandra.server.keystore.password"; + private static final String SERVER_TRUSTSTORE = "cassandra.server.truststore"; + private static final String SERVER_TRUSTSTORE_PASSWORD = "cassandra.server.truststore.password"; + private static final String SERVER_CIPHER_SUITES = "cassandra.server.truststore.password"; + public static final int DEFAULT_STORAGE_PORT = 7000; + public static final int DEFAULT_SSL_STORAGE_PORT = 7001; /** Fills the deprecated OutputFormat interface for streaming. */ @Deprecated @@ -84,7 +95,87 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B { conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement); } - + + public static void setStoragePort(Configuration conf, int port) + { + conf.set(OUTPUT_CQL_STORAGE_PORT, "" + port); + } + + public static void setSSLStoragePort(Configuration conf, int port) + { + conf.set(OUTPUT_CQL_SSL_STORAGE_PORT, "" + port); + } + + public static void setInternodeEncryption(Configuration conf, String encrypt) + { + conf.set(INTERNODE_ENCRYPTION, encrypt); + } + + public static void setServerKeystore(Configuration conf, String keystore) + { + conf.set(SERVER_KEYSTORE, keystore); + } + + public static void setServerKeystorePassword(Configuration conf, String keystorePass) + { + conf.set(SERVER_KEYSTORE_PASSWORD, keystorePass); + } + + public static void setServerTruststore(Configuration conf, String truststore) + { + conf.set(SERVER_TRUSTSTORE, truststore); + } + + public static void setServerTruststorePassword(Configuration conf, String truststorePass) + { + conf.set(SERVER_TRUSTSTORE_PASSWORD, truststorePass); + } + + public static void setServerCipherSuites(Configuration conf, String cipherSuites) + { + conf.set(SERVER_CIPHER_SUITES, cipherSuites); + } + + public static int getStoragePort(Configuration conf) + { + return conf.getInt(OUTPUT_CQL_STORAGE_PORT, DEFAULT_STORAGE_PORT); + } + + public static int getSSLStoragePort(Configuration conf) + { + return conf.getInt(OUTPUT_CQL_SSL_STORAGE_PORT, DEFAULT_SSL_STORAGE_PORT); + } + + public static String getInternodeEncryption(Configuration conf) + { + return conf.get(INTERNODE_ENCRYPTION, EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none.name()); + } + + public static String getServerKeystore(Configuration conf) + { + return conf.get(SERVER_KEYSTORE); + } + + public static String getServerTruststore(Configuration conf) + { + return conf.get(SERVER_TRUSTSTORE); + } + + public static String getServerKeystorePassword(Configuration conf) + { + return conf.get(SERVER_KEYSTORE_PASSWORD); + } + + public static String getServerTruststorePassword(Configuration conf) + { + return conf.get(SERVER_TRUSTSTORE_PASSWORD); + } + + public static String getServerCipherSuites(Configuration conf) + { + return conf.get(SERVER_CIPHER_SUITES); + } + public static String getColumnFamilySchema(Configuration conf, String columnFamily) { String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java index e60a240..ced8aa9 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java @@ -19,13 +19,16 @@ package org.apache.cassandra.hadoop.cql3; import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Set; import java.util.UUID; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.hadoop.AbstractBulkRecordWriter; import org.apache.cassandra.hadoop.BulkRecordWriter; @@ -35,6 +38,9 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.thrift.ITransportFactory; +import org.apache.cassandra.tools.BulkLoader; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Progressable; @@ -108,10 +114,7 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B } if (loader == null) { - ExternalClient externalClient = new ExternalClient(conf); - - externalClient.addKnownCfs(keyspace, schema); - + BulkLoader.ExternalClient externalClient = getExternalClient(conf); this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) { @Override public void onSuccess(StreamState finalState) @@ -171,41 +174,53 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B return dir; } - - public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient - { - private Map<String, Map<String, CFMetaData>> knownCqlCfs = new HashMap<>(); - - public ExternalClient(Configuration conf) - { - super(conf); - } - public void addKnownCfs(String keyspace, String cql) + private BulkLoader.ExternalClient getExternalClient(Configuration conf) + { + Set<InetAddress> hosts = new HashSet<InetAddress>(); + String outputAddress = ConfigHelper.getOutputInitialAddress(conf); + if (outputAddress == null) outputAddress = "localhost"; + String[] nodes = outputAddress.split(","); + for (String node : nodes) { - Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace); - - if (cfs == null) + try { - cfs = new HashMap<>(); - knownCqlCfs.put(keyspace, cfs); + hosts.add(InetAddress.getByName(node)); } - - CFMetaData metadata = CFMetaData.compile(cql, keyspace); - cfs.put(metadata.cfName, metadata); - } - - @Override - public CFMetaData getCFMetaData(String keyspace, String cfName) - { - CFMetaData metadata = super.getCFMetaData(keyspace, cfName); - if (metadata != null) + catch (UnknownHostException e) { - return metadata; + throw new RuntimeException(e); } - - Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace); - return cfs != null ? cfs.get(cfName) : null; } + int rpcPort = ConfigHelper.getOutputRpcPort(conf); + String username = ConfigHelper.getOutputKeyspaceUserName(conf); + String password = ConfigHelper.getOutputKeyspacePassword(conf); + ITransportFactory transportFactory = ConfigHelper.getClientTransportFactory(conf); + return new BulkLoader.ExternalClient(hosts, + rpcPort, + username, + password, + transportFactory, + CqlBulkOutputFormat.getStoragePort(conf), + CqlBulkOutputFormat.getSSLStoragePort(conf), + getServerEncryptOpt(conf)); + } + + private ServerEncryptionOptions getServerEncryptOpt(Configuration conf) + { + ServerEncryptionOptions encryptOpt = new ServerEncryptionOptions(); + String internodeEncrypt = CqlBulkOutputFormat.getInternodeEncryption(conf); + if (StringUtils.isEmpty(internodeEncrypt)) + return encryptOpt; + + encryptOpt.internode_encryption = EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.valueOf(internodeEncrypt); + encryptOpt.keystore = CqlBulkOutputFormat.getServerKeystore(conf); + encryptOpt.truststore = CqlBulkOutputFormat.getServerTruststore(conf); + encryptOpt.keystore_password = CqlBulkOutputFormat.getServerKeystorePassword(conf); + encryptOpt.truststore_password = CqlBulkOutputFormat.getServerTruststorePassword(conf); + String cipherSuites = CqlBulkOutputFormat.getServerCipherSuites(conf); + if (!StringUtils.isEmpty(cipherSuites)) + encryptOpt.cipher_suites = cipherSuites.replace(" ", "").split(","); + return encryptOpt; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java index 7887085..5287bf5 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java @@ -24,18 +24,21 @@ import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption; import org.apache.cassandra.db.BufferCell; import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.hadoop.AbstractBulkRecordWriter; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.HadoopCompat; +import org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat; import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.hadoop.cql3.CqlRecordReader; import org.apache.cassandra.thrift.*; -import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.utils.*; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.mapreduce.*; import org.apache.pig.Expression; import org.apache.pig.ResourceSchema; @@ -54,6 +57,7 @@ import com.datastax.driver.core.Row; public class CqlNativeStorage extends AbstractCassandraStorage { private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class); + public static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat"; private int pageSize = 1000; private String columns; private String outputQuery; @@ -83,6 +87,22 @@ public class CqlNativeStorage extends AbstractCassandraStorage private String nativeSSLCipherSuites; private String inputCql; + private boolean bulkOutputFormat = false; + private String bulkCfSchema; + private String bulkInsertStatement; + private String bulkOutputLocation; + private int bulkBuffSize = -1; + private int bulkStreamThrottle = -1; + private int bulkMaxFailedHosts = -1; + private int storagePort = CqlBulkOutputFormat.DEFAULT_STORAGE_PORT; + private int sslStoragePort = CqlBulkOutputFormat.DEFAULT_SSL_STORAGE_PORT; + private String serverKeystore; + private String serverKeystorePassword; + private String serverTruststore; + private String serverTruststorePassword; + private String serverCipherSuites; + private String internodeEncrypt; + public CqlNativeStorage() { this(1000); @@ -386,57 +406,22 @@ public class CqlNativeStorage extends AbstractCassandraStorage return keys; } - - /** output: (((name, value), (name, value)), (value ... value), (value...value)) */ - public void putNext(Tuple t) throws IOException - { - if (t.size() < 1) - { - // simply nothing here, we can't even delete without a key - logger.warn("Empty output skipped, filter empty tuples to suppress this warning"); - return; - } - - if (t.getType(0) == DataType.TUPLE) - { - if (t.getType(1) == DataType.TUPLE) - { - Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0)); - cqlQueryFromTuple(key, t, 1); - } - else - throw new IOException("Second argument in output must be a tuple"); - } - else - throw new IOException("First argument in output must be a tuple"); - } - /** convert key tuple to key map */ private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException { Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>(); for (int i = 0; i < t.size(); i++) { - if (t.getType(i) == DataType.TUPLE) - { - Tuple inner = (Tuple) t.get(i); - if (inner.size() == 2) - { - Object name = inner.get(0); - if (name != null) - { - keys.put(name.toString(), objToBB(inner.get(1))); - } - else - throw new IOException("Key name was empty"); - } - else - throw new IOException("Keys were not in name and value pairs"); - } - else - { + if (t.getType(i) != DataType.TUPLE) throw new IOException("keys was not a tuple"); - } + + Tuple inner = (Tuple) t.get(i); + if (inner.size() != 2) + throw new IOException("Keys were not in name and value pairs"); + Object name = inner.get(0); + if (name == null) + throw new IOException("Key name was empty"); + keys.put(name.toString(), objToBB(inner.get(1))); } return keys; } @@ -446,21 +431,16 @@ public class CqlNativeStorage extends AbstractCassandraStorage { for (int i = offset; i < t.size(); i++) { - if (t.getType(i) == DataType.TUPLE) - { - Tuple inner = (Tuple) t.get(i); - if (inner.size() > 0) - { - List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner); - if (bindedVariables.size() > 0) - sendCqlQuery(key, bindedVariables); - else - throw new IOException("Missing binded variables"); - } - } - else - { + if (t.getType(i) != DataType.TUPLE) throw new IOException("Output type was not a tuple"); + + Tuple inner = (Tuple) t.get(i); + if (inner.size() > 0) + { + List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner); + if (bindedVariables.size() <= 0) + throw new IOException("Missing binded variables"); + sendCqlQuery(key, bindedVariables); } } } @@ -561,6 +541,37 @@ public class CqlNativeStorage extends AbstractCassandraStorage return property.getProperty(PARTITION_FILTER_SIGNATURE); } + /** + * output: (((name, value), (name, value)), (value ... value), (value...value)) + * bulk output: ((value ... value), (value...value)) + * + * */ + public void putNext(Tuple t) throws IOException + { + if (t.size() < 1) + { + // simply nothing here, we can't even delete without a key + logger.warn("Empty output skipped, filter empty tuples to suppress this warning"); + return; + } + + if (t.getType(0) != DataType.TUPLE) + throw new IOException("First argument in output must be a tuple"); + + if (!bulkOutputFormat && t.getType(1) != DataType.TUPLE) + throw new IOException("Second argument in output must be a tuple"); + + if (bulkOutputFormat) + { + cqlQueryFromTuple(null, t, 0); + } + else + { + Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0)); + cqlQueryFromTuple(key, t, 1); + } + } + /** set read configuration settings */ public void setLocation(String location, Job job) throws IOException { @@ -688,6 +699,42 @@ public class CqlNativeStorage extends AbstractCassandraStorage ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); CqlConfigHelper.setOutputCql(conf, outputQuery); + if (bulkOutputFormat) + { + DEFAULT_OUTPUT_FORMAT = BULK_OUTPUT_FORMAT; + if (bulkCfSchema != null) + CqlBulkOutputFormat.setColumnFamilySchema(conf, column_family, bulkCfSchema); + else + throw new IOException("bulk_cf_schema is missing in input url parameter"); + if (bulkInsertStatement != null) + CqlBulkOutputFormat.setColumnFamilyInsertStatement(conf, column_family, bulkInsertStatement); + else + throw new IOException("bulk_insert_statement is missing in input url parameter"); + if (bulkOutputLocation != null) + conf.set(AbstractBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation); + if (bulkBuffSize > 0) + conf.set(AbstractBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize)); + if (bulkStreamThrottle > 0) + conf.set(AbstractBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle)); + if (bulkMaxFailedHosts > 0) + conf.set(AbstractBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts)); + CqlBulkOutputFormat.setSSLStoragePort(conf, sslStoragePort); + CqlBulkOutputFormat.setStoragePort(conf, storagePort); + if (serverEncrypted()) + { + if (!StringUtils.isEmpty(serverKeystore)) + CqlBulkOutputFormat.setServerKeystore(conf, serverKeystore); + if (!StringUtils.isEmpty(serverTruststore)) + CqlBulkOutputFormat.setServerTruststore(conf, serverTruststore); + if (!StringUtils.isEmpty(serverKeystorePassword)) + CqlBulkOutputFormat.setServerKeystorePassword(conf, serverKeystorePassword); + if (!StringUtils.isEmpty(serverTruststorePassword)) + CqlBulkOutputFormat.setServerTruststorePassword(conf, serverTruststorePassword); + if (!StringUtils.isEmpty(serverCipherSuites)) + CqlBulkOutputFormat.setServerCipherSuites(conf, serverCipherSuites); + } + } + setConnectionInformation(); if (ConfigHelper.getOutputRpcPort(conf) == 0) @@ -700,6 +747,12 @@ public class CqlNativeStorage extends AbstractCassandraStorage initSchema(storeSignature); } + private boolean serverEncrypted() + { + return !StringUtils.isEmpty(internodeEncrypt) && + InternodeEncryption.none != InternodeEncryption.valueOf(internodeEncrypt.toLowerCase()); + } + private void setLocationFromUri(String location) throws IOException { try @@ -720,6 +773,37 @@ public class CqlNativeStorage extends AbstractCassandraStorage if (urlQuery.containsKey("output_query")) outputQuery = urlQuery.get("output_query"); + if (urlQuery.containsKey("bulk_output_format")) + bulkOutputFormat = Boolean.valueOf(urlQuery.get("bulk_output_format")); + if (urlQuery.containsKey("bulk_cf_schema")) + bulkCfSchema = urlQuery.get("bulk_cf_schema"); + if (urlQuery.containsKey("bulk_insert_statement")) + bulkInsertStatement = urlQuery.get("bulk_insert_statement"); + if (urlQuery.containsKey("bulk_output_location")) + bulkOutputLocation = urlQuery.get("bulk_output_location"); + if (urlQuery.containsKey("bulk_buff_size")) + bulkBuffSize = Integer.valueOf(urlQuery.get("bulk_buff_size")); + if (urlQuery.containsKey("bulk_stream_throttle")) + bulkStreamThrottle = Integer.valueOf(urlQuery.get("bulk_stream_throttle")); + if (urlQuery.containsKey("bulk_max_failed_hosts")) + bulkMaxFailedHosts = Integer.valueOf(urlQuery.get("bulk_max_failed_hosts")); + if (urlQuery.containsKey("storage_port")) + storagePort = Integer.valueOf(urlQuery.get("storage_port")); + if (urlQuery.containsKey("ssl_storage_port")) + sslStoragePort = Integer.valueOf(urlQuery.get("ssl_storage_port")); + if (urlQuery.containsKey("internode_encrypt")) + internodeEncrypt = urlQuery.get("internode_encrypt"); + if (urlQuery.containsKey("server_keystore")) + serverKeystore = urlQuery.get("server_keystore"); + if (urlQuery.containsKey("server_truststore")) + serverTruststore = urlQuery.get("server_truststore"); + if (urlQuery.containsKey("server_keystore_pass")) + serverKeystorePassword = urlQuery.get("server_keystore_pass"); + if (urlQuery.containsKey("server_truststore_pass")) + serverTruststorePassword = urlQuery.get("server_truststore_pass"); + if (urlQuery.containsKey("server_cipher_suites")) + serverCipherSuites = urlQuery.get("server_cipher_suites"); + //split size if (urlQuery.containsKey("split_size")) splitSize = Integer.parseInt(urlQuery.get("split_size")); @@ -804,8 +888,15 @@ public class CqlNativeStorage extends AbstractCassandraStorage "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" + "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" + "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" + - "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage()); - } + "[columns=<columns>][where_clause=<where_clause>]" + + "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement]" + + "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>]" + + "[&storage_port=<storage_port>][&ssl_storage_port=<ssl_storage_port>]" + + "[&server_keystore=<server_keystore>][&server_keystore_pass=<server_keystore_pass>]" + + "[&server_truststore=<server_truststore>][&server_truststore_pass=<server_truststore_pass>]" + + "[&server_cipher_suites=<server_cipher_suites>][&internode_encrypt=<internode_encrypt>]" + + "[&bulk_stream_throttle=<bulk_stream_throttle>][&bulk_max_failed_hosts=<bulk_max_failed_hosts>]]': " + e.getMessage()); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index c7277fa..66583ec 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -38,4 +38,3 @@ public class CqlStorage extends CqlNativeStorage super(pageSize); } } - http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 88a4404..f4b30cb 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -254,7 +254,7 @@ public class BulkLoader } } - static class ExternalClient extends SSTableLoader.Client + public static class ExternalClient extends SSTableLoader.Client { private final Map<String, CFMetaData> knownCfs = new HashMap<>(); private final Set<InetAddress> hosts; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index ec988e2..7be72dd 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -10,6 +10,7 @@ commitlog_segment_size_in_mb: 5 partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner listen_address: 127.0.0.1 storage_port: 7010 +ssl_storage_port: 7011 rpc_port: 9170 start_native_transport: true native_transport_port: 9042 http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/pig/org/apache/cassandra/pig/CqlTableTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java index 4ca043d..2e1758e 100644 --- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java +++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java @@ -53,6 +53,10 @@ public class CqlTableTest extends PigTestBase "CREATE INDEX test_b on test (b);", "CREATE TABLE moredata (x int PRIMARY KEY, y int);", + "CREATE TABLE test_bulk (a int PRIMARY KEY, b int);", + "INSERT INTO test_bulk (a,b) VALUES (1,1);", + "INSERT INTO test_bulk (a,b) VALUES (2,2);", + "INSERT INTO test_bulk (a,b) VALUES (3,3);", "INSERT INTO test (a,b) VALUES (1,1);", "INSERT INTO test (a,b) VALUES (2,2);", "INSERT INTO test (a,b) VALUES (3,3);", @@ -160,10 +164,13 @@ public class CqlTableTest extends PigTestBase //input_cql=select * from test where token(a) > ? and token(a) <= ? pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20test%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F' USING CqlNativeStorage();"); Iterator<Tuple> it = pig.openIterator("result"); + int count = 0; while (it.hasNext()) { Tuple t = it.next(); Assert.assertEquals(t.get(0), t.get(1)); + count ++; } + Assert.assertEquals(6, count); } @Test @@ -310,4 +317,33 @@ public class CqlTableTest extends PigTestBase Assert.fail("Can't fetch any data"); } } + + @Test + public void testCqlStorageSingleKeyTableBulkLoad() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + pig.setBatchOn(); + //input_cql=select * from moredata where token(x) > ? and token(x) <= ? + pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(x, y);"); + pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&bulk_output_format=true&bulk_cf_schema=CREATE%20TABLE%20cql3ks.test_bulk%20(a%20int%20PRIMARY%20KEY%2C%20b%20int)&bulk_insert_statement=Insert%20into%20cql3ks.test_bulk(a%2C%20b)%20values(%3F%2C%3F)' USING CqlNativeStorage();"); + pig.executeBatch(); + + //(5,5) + //(6,6) + //(4,4) + //(2,2) + //(3,3) + //(1,1) + //input_cql=select * from test_bulk1 where token(a) > ? and token(a) <= ? + pig.registerQuery("result= LOAD 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20test_bulk%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + Iterator<Tuple> it = pig.openIterator("result"); + int count = 0; + while (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), t.get(1)); + count ++; + } + Assert.assertEquals(6, count); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/pig/org/apache/cassandra/pig/PigTestBase.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java b/test/pig/org/apache/cassandra/pig/PigTestBase.java index 4b3e422..e6964f8 100644 --- a/test/pig/org/apache/cassandra/pig/PigTestBase.java +++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java @@ -65,7 +65,8 @@ public class PigTestBase extends SchemaLoader protected static Configuration conf; protected static MiniCluster cluster; protected static PigServer pig; - protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner"; + protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner" + + "&storage_port=7010&ssl_storage_port=7011&internode_encrypt=NONE"; protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000" + "&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" + "&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9042";
