Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cd4a1e6a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cd4a1e6a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cd4a1e6a Branch: refs/heads/cassandra-2.2 Commit: cd4a1e6acc51b0f708127a50d37dd7832bbe8dfa Parents: 9791796 c7b4073 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Sep 15 16:12:45 2015 +0100 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Sep 15 16:12:45 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/AbstractColumnFamilyInputFormat.java | 2 + .../hadoop/cql3/CqlBulkRecordWriter.java | 13 +- .../cassandra/hadoop/pig/CqlNativeStorage.java | 171 ++++++++++++------- test/conf/cassandra_pig.yaml | 41 +++++ .../org/apache/cassandra/pig/CqlTableTest.java | 35 ++++ .../org/apache/cassandra/pig/PigTestBase.java | 3 +- 7 files changed, 205 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index bd24781,5f11049..b0ade42 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,17 -1,8 +1,18 @@@ -2.1.10 +2.2.2 + * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761) + * Cancel transaction for sstables we wont redistribute index summary + for (CASSANDRA-10270) + * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) + * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222) + * Fix failure to start with space in directory path on Windows (CASSANDRA-10239) + * Fix repair hang when snapshot failed (CASSANDRA-10057) + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks + (CASSANDRA-10199) +Merged from 2.1: + * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410) * BATCH statement is broken in cqlsh (CASSANDRA-10272) * Added configurable warning threshold for GC duration (CASSANDRA-8907) - * (cqlsh) Make cqlsh PEP8 compliant (CASSANDRA-10066) + * (cqlsh) Make cqlsh PEP8 Compliant (CASSANDRA-10066) * (cqlsh) Fix error when starting cqlsh with --debug (CASSANDRA-10282) * Scrub, Cleanup and Upgrade do not unmark compacting until all operations have completed, regardless of the occurence of exceptions (CASSANDRA-10274) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java index 4dd53ff,e8de0f2..9c45bfe --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java @@@ -24,23 -32,31 +24,24 @@@ import java.util.concurrent.* import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.auth.IAuthenticator; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TokenRange; ++ +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.OrderPreservingPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.thrift.AuthenticationRequest; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.CfSplit; -import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.hadoop.cql3.*; import org.apache.cassandra.thrift.KeyRange; -import org.apache.cassandra.thrift.TokenRange; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.thrift.TApplicationException; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; - +import org.apache.hadoop.mapreduce.*; public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y> { @@@ -226,64 -271,83 +227,65 @@@ } } - private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException + private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException { - int splitsize = ConfigHelper.getInputSplitSize(conf); - for (int i = 0; i < range.rpc_endpoints.size(); i++) + int splitSize = ConfigHelper.getInputSplitSize(conf); + try { - String host = range.rpc_endpoints.get(i); - - if (host == null || host.equals("0.0.0.0")) - host = range.endpoints.get(i); - - try - { - Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf)); - client.set_keyspace(keyspace); - - try - { - return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize); - } - catch (TApplicationException e) - { - // fallback to guessing split size if talking to a server without describe_splits_ex method - if (e.getType() == TApplicationException.UNKNOWN_METHOD) - { - List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize); - return tokenListToSplits(splitPoints, splitsize); - } - throw e; - } - } - catch (IOException e) - { - logger.debug("failed connect to endpoint {}", host, e); - } - catch (InvalidRequestException e) - { - throw new RuntimeException(e); - } - catch (TException e) - { - throw new RuntimeException(e); - } + return describeSplits(keyspace, cfName, range, splitSize); + } + catch (Exception e) + { + throw new RuntimeException(e); } - throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ",")); } - private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize) + private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace) { - List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1); - for (int j = 0; j < splitTokens.size() - 1; j++) - splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize)); - return splits; + try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect()) + { + Map<TokenRange, Set<Host>> map = new HashMap<>(); + Metadata metadata = session.getCluster().getMetadata(); + for (TokenRange tokenRange : metadata.getTokenRanges()) + map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange)); + return map; + } } - private List<TokenRange> getRangeMap(Configuration conf) throws IOException + private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize) { - Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf); - - List<TokenRange> map; - try + String query = String.format("SELECT mean_partition_size, partitions_count " + + "FROM %s.%s " + + "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", + SystemKeyspace.NAME, + SystemKeyspace.SIZE_ESTIMATES); + + ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()); + + Row row = resultSet.one(); + // If we have no data on this split, return the full split i.e., do not sub-split + // Assume smallest granularity of partition count available from CASSANDRA-7688 + if (row == null) { - map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf)); + Map<TokenRange, Long> wrappedTokenRange = new HashMap<>(); + wrappedTokenRange.put(tokenRange, (long) 128); + return wrappedTokenRange; } - catch (InvalidRequestException e) - { - throw new RuntimeException(e); - } - catch (TException e) - { - throw new RuntimeException(e); - } - return map; + + long meanPartitionSize = row.getLong("mean_partition_size"); + long partitionCount = row.getLong("partitions_count"); + + int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize); ++ if (splitCount <= 0) splitCount = 1; + List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount); + Map<TokenRange, Long> rangesWithLength = new HashMap<>(); + for (TokenRange range : splitRanges) + rangesWithLength.put(range, partitionCount/splitCount); + + return rangesWithLength; } - // // Old Hadoop API - // public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java index e77c4c8,ced8aa9..9e6e23b --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java @@@ -23,16 -22,15 +23,17 @@@ import java.io.IOException import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.*; -import org.apache.cassandra.config.EncryptionOptions; -import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; ++import org.apache.cassandra.dht.IPartitioner; ++import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.hadoop.AbstractBulkRecordWriter; import org.apache.cassandra.hadoop.BulkRecordWriter; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.HadoopCompat; @@@ -84,6 -67,6 +85,7 @@@ public class CqlBulkRecordWriter extend private String insertStatement; private File outputDir; private boolean deleteSrc; ++ private IPartitioner partitioner; CqlBulkRecordWriter(TaskAttemptContext context) throws IOException { @@@ -113,55 -90,45 +115,64 @@@ { // if anything is missing, exceptions will be thrown here, instead of on write() keyspace = ConfigHelper.getOutputKeyspace(conf); - columnFamily = ConfigHelper.getOutputColumnFamily(conf); - schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily); - insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily); - outputDir = getColumnFamilyDirectory(); + table = ConfigHelper.getOutputColumnFamily(conf); + + // check if table is aliased + String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table); + if (aliasedCf != null) + table = aliasedCf; + + schema = CqlBulkOutputFormat.getTableSchema(conf, table); + insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, table); + outputDir = getTableDirectory(); deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf); ++ try ++ { ++ partitioner = ConfigHelper.getInputPartitioner(conf); ++ } ++ catch (Exception e) ++ { ++ partitioner = Murmur3Partitioner.instance; ++ } + } + + protected String getOutputLocation() throws IOException + { + String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); + if (dir == null) + throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); + return dir; } - private void prepareWriter() throws IOException { - try + if (writer == null) { - if (writer == null) - { - writer = CQLSSTableWriter.builder() - .forTable(schema) - .using(insertStatement) - .withPartitioner(ConfigHelper.getOutputPartitioner(conf)) - .inDirectory(outputDir) - .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"))) - .build(); - } - if (loader == null) - { - BulkLoader.ExternalClient externalClient = getExternalClient(conf); - this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) { - @Override - public void onSuccess(StreamState finalState) - { - if (deleteSrc) - FileUtils.deleteRecursive(outputDir); - } - }; - } + writer = CQLSSTableWriter.builder() + .forTable(schema) + .using(insertStatement) + .withPartitioner(ConfigHelper.getOutputPartitioner(conf)) + .inDirectory(outputDir) + .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"))) ++ .withPartitioner(partitioner) + .build(); } - catch (Exception e) + + if (loader == null) { - throw new IOException(e); - } + ExternalClient externalClient = new ExternalClient(conf); + externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace)); + + loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) + { + @Override + public void onSuccess(StreamState finalState) + { + if (deleteSrc) + FileUtils.deleteRecursive(outputDir); + } + }; + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java index dc3c174,5287bf5..223a848 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java @@@ -17,71 -17,47 +17,75 @@@ */ package org.apache.cassandra.hadoop.pig; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; import java.util.*; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption; -import org.apache.cassandra.db.BufferCell; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.composites.CellNames; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.exceptions.NoHostAvailableException; ++ import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.exceptions.AuthenticationException; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.hadoop.AbstractBulkRecordWriter; +import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.HadoopCompat; + import org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat; ++import org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter; import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.hadoop.cql3.CqlRecordReader; -import org.apache.cassandra.thrift.*; +import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.utils.*; -import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; -import org.apache.pig.Expression; -import org.apache.pig.ResourceSchema; +import org.apache.pig.*; import org.apache.pig.Expression.OpType; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; import org.apache.pig.impl.util.UDFContext; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder; -import com.datastax.driver.core.Row; -public class CqlNativeStorage extends AbstractCassandraStorage +public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata { + protected String DEFAULT_INPUT_FORMAT; + protected String DEFAULT_OUTPUT_FORMAT; + + protected String username; + protected String password; + protected String keyspace; + protected String column_family; + protected String loadSignature; + protected String storeSignature; + + protected Configuration conf; + protected String inputFormatClass; + protected String outputFormatClass; + protected int splitSize = 64 * 1024; + protected String partitionerClass; + protected boolean usePartitionFilter = false; + protected String initHostAddress; + protected String rpcPort; + protected int nativeProtocolVersion = 1; + private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class); - public static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat"; ++ private static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat"; private int pageSize = 1000; private String columns; private String outputQuery; @@@ -109,6 -87,22 +113,16 @@@ private String nativeSSLCipherSuites; private String inputCql; + private boolean bulkOutputFormat = false; + private String bulkCfSchema; + private String bulkInsertStatement; + private String bulkOutputLocation; + private int bulkBuffSize = -1; + private int bulkStreamThrottle = -1; + private int bulkMaxFailedHosts = -1; - private int storagePort = CqlBulkOutputFormat.DEFAULT_STORAGE_PORT; - private int sslStoragePort = CqlBulkOutputFormat.DEFAULT_SSL_STORAGE_PORT; - private String serverKeystore; - private String serverKeystorePassword; - private String serverTruststore; - private String serverTruststorePassword; - private String serverCipherSuites; - private String internodeEncrypt; ++ private boolean bulkDeleteSourceOnSuccess = true; ++ private String bulkTableAlias; + public CqlNativeStorage() { this(1000); @@@ -241,83 -240,188 +255,48 @@@ return obj; } - /** include key columns */ - protected List<ColumnDef> getColumnMetadata(Cassandra.Client client) - throws InvalidRequestException, - UnavailableException, - TimedOutException, - SchemaDisagreementException, - TException, - CharacterCodingException, - org.apache.cassandra.exceptions.InvalidRequestException, - ConfigurationException, - NotFoundException - { - List<ColumnDef> keyColumns = null; - // get key columns + /** get the columnfamily definition for the signature */ + protected TableInfo getCfInfo(String signature) throws IOException + { + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(CqlNativeStorage.class); + TableInfo cfInfo; try { - keyColumns = getKeysMeta(client); + cfInfo = cfdefFromString(property.getProperty(signature)); } - catch(Exception e) + catch (ClassNotFoundException e) { - logger.error("Error in retrieving key columns" , e); + throw new IOException(e); } - - // get other columns - List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias); - - // combine all columns in a list - if (keyColumns != null && columns != null) - keyColumns.addAll(columns); - - return keyColumns; + return cfInfo; } - /** get keys meta data */ - private List<ColumnDef> getKeysMeta(Cassandra.Client client) - throws Exception + /** return the CfInfo for the column family */ + protected TableMetadata getCfInfo(Session client) + throws NoHostAvailableException, + AuthenticationException, + IllegalStateException { - String query = "SELECT key_aliases, " + - " column_aliases, " + - " key_validator, " + - " comparator, " + - " keyspace_name, " + - " value_alias, " + - " default_validator " + - "FROM system.schema_columnfamilies " + - "WHERE keyspace_name = '%s'" + - " AND columnfamily_name = '%s' "; - - CqlResult result = client.execute_cql3_query( - ByteBufferUtil.bytes(String.format(query, keyspace, column_family)), - Compression.NONE, - ConsistencyLevel.ONE); - - if (result == null || result.rows == null || result.rows.isEmpty()) - return null; - - Iterator<CqlRow> iteraRow = result.rows.iterator(); - List<ColumnDef> keys = new ArrayList<ColumnDef>(); - if (iteraRow.hasNext()) - { - CqlRow cqlRow = iteraRow.next(); - String name = ByteBufferUtil.string(cqlRow.columns.get(4).value); - logger.debug("Found ksDef name: {}", name); - String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); - - logger.debug("partition keys: {}", keyString); - List<String> keyNames = FBUtilities.fromJsonList(keyString); - - Iterator<String> iterator = keyNames.iterator(); - while (iterator.hasNext()) - { - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(iterator.next()); - keys.add(cDef); - } - // classic thrift tables - if (keys.size() == 0) - { - CFMetaData cfm = getCFMetaData(keyspace, column_family, client); - for (ColumnDefinition def : cfm.partitionKeyColumns()) - { - String key = def.name.toString(); - logger.debug("name: {} ", key); - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(key); - keys.add(cDef); - } - for (ColumnDefinition def : cfm.clusteringColumns()) - { - String key = def.name.toString(); - logger.debug("name: {} ", key); - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(key); - keys.add(cDef); - } - } - - keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue())); - - logger.debug("cluster keys: {}", keyString); - keyNames = FBUtilities.fromJsonList(keyString); - - iterator = keyNames.iterator(); - while (iterator.hasNext()) - { - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(iterator.next()); - keys.add(cDef); - } - - String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue())); - logger.debug("row key validator: {}", validator); - AbstractType<?> keyValidator = parseType(validator); - - Iterator<ColumnDef> keyItera = keys.iterator(); - if (keyValidator instanceof CompositeType) - { - Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator(); - while (typeItera.hasNext()) - keyItera.next().validation_class = typeItera.next().toString(); - } - else - keyItera.next().validation_class = keyValidator.toString(); - - validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue())); - logger.debug("cluster key validator: {}", validator); - - if (keyItera.hasNext() && validator != null && !validator.isEmpty()) - { - AbstractType<?> clusterKeyValidator = parseType(validator); - - if (clusterKeyValidator instanceof CompositeType) - { - Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator(); - while (keyItera.hasNext()) - keyItera.next().validation_class = typeItera.next().toString(); - } - else - keyItera.next().validation_class = clusterKeyValidator.toString(); - } - - // compact value_alias column - if (cqlRow.columns.get(5).value != null) - { - try - { - String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue())); - logger.debug("default validator: {}", compactValidator); - AbstractType<?> defaultValidator = parseType(compactValidator); - - ColumnDef cDef = new ColumnDef(); - cDef.name = cqlRow.columns.get(5).value; - cDef.validation_class = defaultValidator.toString(); - keys.add(cDef); - hasCompactValueAlias = true; - } - catch (Exception e) - { - // no compact column at value_alias - } - } - - } - return keys; + // get CF meta data + return client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family)); } - /** output: (((name, value), (name, value)), (value ... value), (value...value)) */ - public void putNext(Tuple t) throws IOException - { - if (t.size() < 1) - { - // simply nothing here, we can't even delete without a key - logger.warn("Empty output skipped, filter empty tuples to suppress this warning"); - return; - } - - if (t.getType(0) == DataType.TUPLE) - { - if (t.getType(1) == DataType.TUPLE) - { - Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0)); - cqlQueryFromTuple(key, t, 1); - } - else - throw new IOException("Second argument in output must be a tuple"); - } - else - throw new IOException("First argument in output must be a tuple"); - } - /** convert key tuple to key map */ private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException { Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>(); for (int i = 0; i < t.size(); i++) { - if (t.getType(i) == DataType.TUPLE) - { - Tuple inner = (Tuple) t.get(i); - if (inner.size() == 2) - { - Object name = inner.get(0); - if (name != null) - { - keys.put(name.toString(), objToBB(inner.get(1))); - } - else - throw new IOException("Key name was empty"); - } - else - throw new IOException("Keys were not in name and value pairs"); - } - else - { + if (t.getType(i) != DataType.TUPLE) throw new IOException("keys was not a tuple"); - } - + Tuple inner = (Tuple) t.get(i); + if (inner.size() != 2) + throw new IOException("Keys were not in name and value pairs"); + Object name = inner.get(0); + if (name == null) + throw new IOException("Key name was empty"); + keys.put(name.toString(), objToBB(inner.get(1))); } return keys; } @@@ -543,10 -537,41 +517,41 @@@ private String getWhereClauseForPartitionFilter() { UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(AbstractCassandraStorage.class); - return property.getProperty(PARTITION_FILTER_SIGNATURE); + Properties property = context.getUDFProperties(CqlNativeStorage.class); + return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE); } + /** + * output: (((name, value), (name, value)), (value ... value), (value...value)) + * bulk output: ((value ... value), (value...value)) + * + * */ + public void putNext(Tuple t) throws IOException + { + if (t.size() < 1) + { + // simply nothing here, we can't even delete without a key + logger.warn("Empty output skipped, filter empty tuples to suppress this warning"); + return; + } + + if (t.getType(0) != DataType.TUPLE) + throw new IOException("First argument in output must be a tuple"); + + if (!bulkOutputFormat && t.getType(1) != DataType.TUPLE) + throw new IOException("Second argument in output must be a tuple"); + + if (bulkOutputFormat) + { + cqlQueryFromTuple(null, t, 0); + } + else + { + Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0)); + cqlQueryFromTuple(key, t, 1); + } + } + /** set read configuration settings */ public void setLocation(String location, Job job) throws IOException { @@@ -672,6 -699,42 +677,32 @@@ ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); CqlConfigHelper.setOutputCql(conf, outputQuery); + if (bulkOutputFormat) + { + DEFAULT_OUTPUT_FORMAT = BULK_OUTPUT_FORMAT; + if (bulkCfSchema != null) - CqlBulkOutputFormat.setColumnFamilySchema(conf, column_family, bulkCfSchema); ++ CqlBulkOutputFormat.setTableSchema(conf, column_family, bulkCfSchema); + else + throw new IOException("bulk_cf_schema is missing in input url parameter"); + if (bulkInsertStatement != null) - CqlBulkOutputFormat.setColumnFamilyInsertStatement(conf, column_family, bulkInsertStatement); ++ CqlBulkOutputFormat.setTableInsertStatement(conf, column_family, bulkInsertStatement); + else + throw new IOException("bulk_insert_statement is missing in input url parameter"); ++ if (bulkTableAlias != null) ++ CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family); ++ CqlBulkOutputFormat.setDeleteSourceOnSuccess(conf, bulkDeleteSourceOnSuccess); + if (bulkOutputLocation != null) - conf.set(AbstractBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation); ++ conf.set(CqlBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation); + if (bulkBuffSize > 0) - conf.set(AbstractBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize)); ++ conf.set(CqlBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize)); + if (bulkStreamThrottle > 0) - conf.set(AbstractBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle)); ++ conf.set(CqlBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle)); + if (bulkMaxFailedHosts > 0) - conf.set(AbstractBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts)); - CqlBulkOutputFormat.setSSLStoragePort(conf, sslStoragePort); - CqlBulkOutputFormat.setStoragePort(conf, storagePort); - if (serverEncrypted()) - { - if (!StringUtils.isEmpty(serverKeystore)) - CqlBulkOutputFormat.setServerKeystore(conf, serverKeystore); - if (!StringUtils.isEmpty(serverTruststore)) - CqlBulkOutputFormat.setServerTruststore(conf, serverTruststore); - if (!StringUtils.isEmpty(serverKeystorePassword)) - CqlBulkOutputFormat.setServerKeystorePassword(conf, serverKeystorePassword); - if (!StringUtils.isEmpty(serverTruststorePassword)) - CqlBulkOutputFormat.setServerTruststorePassword(conf, serverTruststorePassword); - if (!StringUtils.isEmpty(serverCipherSuites)) - CqlBulkOutputFormat.setServerCipherSuites(conf, serverCipherSuites); - } ++ conf.set(CqlBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts)); ++ if (partitionerClass!= null) ++ ConfigHelper.setInputPartitioner(conf, partitionerClass); + } + setConnectionInformation(); if (ConfigHelper.getOutputRpcPort(conf) == 0) @@@ -773,6 -773,37 +804,25 @@@ if (urlQuery.containsKey("output_query")) outputQuery = urlQuery.get("output_query"); + if (urlQuery.containsKey("bulk_output_format")) + bulkOutputFormat = Boolean.valueOf(urlQuery.get("bulk_output_format")); + if (urlQuery.containsKey("bulk_cf_schema")) + bulkCfSchema = urlQuery.get("bulk_cf_schema"); + if (urlQuery.containsKey("bulk_insert_statement")) + bulkInsertStatement = urlQuery.get("bulk_insert_statement"); + if (urlQuery.containsKey("bulk_output_location")) + bulkOutputLocation = urlQuery.get("bulk_output_location"); + if (urlQuery.containsKey("bulk_buff_size")) + bulkBuffSize = Integer.valueOf(urlQuery.get("bulk_buff_size")); + if (urlQuery.containsKey("bulk_stream_throttle")) + bulkStreamThrottle = Integer.valueOf(urlQuery.get("bulk_stream_throttle")); + if (urlQuery.containsKey("bulk_max_failed_hosts")) + bulkMaxFailedHosts = Integer.valueOf(urlQuery.get("bulk_max_failed_hosts")); - if (urlQuery.containsKey("storage_port")) - storagePort = Integer.valueOf(urlQuery.get("storage_port")); - if (urlQuery.containsKey("ssl_storage_port")) - sslStoragePort = Integer.valueOf(urlQuery.get("ssl_storage_port")); - if (urlQuery.containsKey("internode_encrypt")) - internodeEncrypt = urlQuery.get("internode_encrypt"); - if (urlQuery.containsKey("server_keystore")) - serverKeystore = urlQuery.get("server_keystore"); - if (urlQuery.containsKey("server_truststore")) - serverTruststore = urlQuery.get("server_truststore"); - if (urlQuery.containsKey("server_keystore_pass")) - serverKeystorePassword = urlQuery.get("server_keystore_pass"); - if (urlQuery.containsKey("server_truststore_pass")) - serverTruststorePassword = urlQuery.get("server_truststore_pass"); - if (urlQuery.containsKey("server_cipher_suites")) - serverCipherSuites = urlQuery.get("server_cipher_suites"); ++ if (urlQuery.containsKey("bulk_delete_source")) ++ bulkDeleteSourceOnSuccess = Boolean.parseBoolean(urlQuery.get("bulk_delete_source")); ++ if (urlQuery.containsKey("bulk_table_alias")) ++ bulkTableAlias = urlQuery.get("bulk_table_alias"); + //split size if (urlQuery.containsKey("split_size")) splitSize = Integer.parseInt(urlQuery.get("split_size")); @@@ -855,10 -888,20 +905,13 @@@ "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" + "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" + "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" + - "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage()); - } + "[columns=<columns>][where_clause=<where_clause>]" + - "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement]" + - "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>]" + - "[&storage_port=<storage_port>][&ssl_storage_port=<ssl_storage_port>]" + - "[&server_keystore=<server_keystore>][&server_keystore_pass=<server_keystore_pass>]" + - "[&server_truststore=<server_truststore>][&server_truststore_pass=<server_truststore_pass>]" + - "[&server_cipher_suites=<server_cipher_suites>][&internode_encrypt=<internode_encrypt>]" + ++ "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement][&bulk_table_alias=<bulk_table_alias>]" + ++ "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>][&bulk_delete_source=<bulk_delete_source>]" + + "[&bulk_stream_throttle=<bulk_stream_throttle>][&bulk_max_failed_hosts=<bulk_max_failed_hosts>]]': " + e.getMessage()); + } } - /** - * Thrift API can't handle null, so use empty byte array - */ public ByteBuffer nullToBB() { return ByteBuffer.wrap(new byte[0]); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/conf/cassandra_pig.yaml ---------------------------------------------------------------------- diff --cc test/conf/cassandra_pig.yaml index 0000000,0000000..68615cf new file mode 100644 --- /dev/null +++ b/test/conf/cassandra_pig.yaml @@@ -1,0 -1,0 +1,41 @@@ ++# ++# Warning! ++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file. ++# ++cluster_name: Test Cluster ++memtable_allocation_type: offheap_objects ++commitlog_sync: batch ++commitlog_sync_batch_window_in_ms: 1.0 ++commitlog_segment_size_in_mb: 5 ++commitlog_directory: build/test/cassandra/commitlog ++partitioner: org.apache.cassandra.dht.Murmur3Partitioner ++listen_address: 127.0.0.1 ++storage_port: 7010 ++rpc_port: 9170 ++start_native_transport: true ++native_transport_port: 9042 ++column_index_size_in_kb: 4 ++saved_caches_directory: build/test/cassandra/saved_caches ++data_file_directories: ++ - build/test/cassandra/data ++disk_access_mode: mmap ++seed_provider: ++ - class_name: org.apache.cassandra.locator.SimpleSeedProvider ++ parameters: ++ - seeds: "127.0.0.1" ++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch ++dynamic_snitch: true ++request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler ++request_scheduler_id: keyspace ++server_encryption_options: ++ internode_encryption: none ++ keystore: conf/.keystore ++ keystore_password: cassandra ++ truststore: conf/.truststore ++ truststore_password: cassandra ++incremental_backups: true ++concurrent_compactors: 4 ++compaction_throughput_mb_per_sec: 0 ++row_cache_class_name: org.apache.cassandra.cache.OHCProvider ++row_cache_size_in_mb: 16 ++enable_user_defined_functions: true http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/pig/org/apache/cassandra/pig/CqlTableTest.java ---------------------------------------------------------------------- diff --cc test/pig/org/apache/cassandra/pig/CqlTableTest.java index 72fdd5a,2e1758e..3902fce --- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java +++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java @@@ -265,4 -317,33 +272,32 @@@ public class CqlTableTest extends PigTe Assert.fail("Can't fetch any data"); } } + + @Test - public void testCqlStorageSingleKeyTableBulkLoad() - throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException ++ public void testCqlStorageSingleKeyTableBulkLoad() throws TException, IOException + { + pig.setBatchOn(); + //input_cql=select * from moredata where token(x) > ? and token(x) <= ? + pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(x, y);"); + pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&bulk_output_format=true&bulk_cf_schema=CREATE%20TABLE%20cql3ks.test_bulk%20(a%20int%20PRIMARY%20KEY%2C%20b%20int)&bulk_insert_statement=Insert%20into%20cql3ks.test_bulk(a%2C%20b)%20values(%3F%2C%3F)' USING CqlNativeStorage();"); + pig.executeBatch(); + + //(5,5) + //(6,6) + //(4,4) + //(2,2) + //(3,3) + //(1,1) + //input_cql=select * from test_bulk1 where token(a) > ? and token(a) <= ? + pig.registerQuery("result= LOAD 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20test_bulk%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + Iterator<Tuple> it = pig.openIterator("result"); + int count = 0; + while (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), t.get(1)); + count ++; + } + Assert.assertEquals(6, count); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/pig/org/apache/cassandra/pig/PigTestBase.java ---------------------------------------------------------------------- diff --cc test/pig/org/apache/cassandra/pig/PigTestBase.java index 8c27f6c,e6964f8..a8a9de5 --- a/test/pig/org/apache/cassandra/pig/PigTestBase.java +++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java @@@ -54,7 -65,8 +54,7 @@@ public class PigTestBase extends Schema protected static Configuration conf; protected static MiniCluster cluster; protected static PigServer pig; - protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner"; - protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner" + - "&storage_port=7010&ssl_storage_port=7011&internode_encrypt=NONE"; ++ protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.Murmur3Partitioner"; protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000" + "&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" + "&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9042"; @@@ -62,6 -74,6 +62,7 @@@ static { System.setProperty("logback.configurationFile", "logback-test.xml"); ++ System.setProperty("cassandra.config", "cassandra_pig.yaml"); } @AfterClass