Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cd4a1e6a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cd4a1e6a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cd4a1e6a

Branch: refs/heads/cassandra-2.2
Commit: cd4a1e6acc51b0f708127a50d37dd7832bbe8dfa
Parents: 9791796 c7b4073
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Tue Sep 15 16:12:45 2015 +0100
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Tue Sep 15 16:12:45 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../hadoop/AbstractColumnFamilyInputFormat.java |   2 +
 .../hadoop/cql3/CqlBulkRecordWriter.java        |  13 +-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  | 171 ++++++++++++-------
 test/conf/cassandra_pig.yaml                    |  41 +++++
 .../org/apache/cassandra/pig/CqlTableTest.java  |  35 ++++
 .../org/apache/cassandra/pig/PigTestBase.java   |   3 +-
 7 files changed, 205 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bd24781,5f11049..b0ade42
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,8 +1,18 @@@
 -2.1.10
 +2.2.2
 + * Defer default role manager setup until all nodes are on 2.2+ 
(CASSANDRA-9761)
 + * Cancel transaction for sstables we wont redistribute index summary
 +   for (CASSANDRA-10270)
 + * Handle missing RoleManager in config after upgrade to 2.2 
(CASSANDRA-10209) 
 + * Retry snapshot deletion after compaction and gc on Windows 
(CASSANDRA-10222)
 + * Fix failure to start with space in directory path on Windows 
(CASSANDRA-10239)
 + * Fix repair hang when snapshot failed (CASSANDRA-10057)
 + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
 +   (CASSANDRA-10199)
 +Merged from 2.1:
+  * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
   * BATCH statement is broken in cqlsh (CASSANDRA-10272)
   * Added configurable warning threshold for GC duration (CASSANDRA-8907)
 - * (cqlsh) Make cqlsh PEP8 compliant (CASSANDRA-10066)
 + * (cqlsh) Make cqlsh PEP8 Compliant (CASSANDRA-10066)
   * (cqlsh) Fix error when starting cqlsh with --debug (CASSANDRA-10282)
   * Scrub, Cleanup and Upgrade do not unmark compacting until all operations
     have completed, regardless of the occurence of exceptions (CASSANDRA-10274)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 4dd53ff,e8de0f2..9c45bfe
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@@ -24,23 -32,31 +24,24 @@@ import java.util.concurrent.*
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.auth.IAuthenticator;
 +import com.datastax.driver.core.Host;
 +import com.datastax.driver.core.Metadata;
 +import com.datastax.driver.core.ResultSet;
 +import com.datastax.driver.core.Row;
 +import com.datastax.driver.core.Session;
 +import com.datastax.driver.core.TokenRange;
++
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner;
  import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.OrderPreservingPartitioner;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.thrift.AuthenticationRequest;
 -import org.apache.cassandra.thrift.Cassandra;
 -import org.apache.cassandra.thrift.CfSplit;
 -import org.apache.cassandra.thrift.InvalidRequestException;
 +import org.apache.cassandra.hadoop.cql3.*;
  import org.apache.cassandra.thrift.KeyRange;
 -import org.apache.cassandra.thrift.TokenRange;
 -import org.apache.commons.lang3.StringUtils;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.mapred.JobConf;
 -import org.apache.hadoop.mapreduce.InputFormat;
 -import org.apache.hadoop.mapreduce.InputSplit;
 -import org.apache.hadoop.mapreduce.JobContext;
 -import org.apache.hadoop.mapreduce.TaskAttemptContext;
 -import org.apache.hadoop.mapreduce.TaskAttemptID;
 -import org.apache.thrift.TApplicationException;
 -import org.apache.thrift.TException;
 -import org.apache.thrift.protocol.TBinaryProtocol;
 -import org.apache.thrift.protocol.TProtocol;
 -import org.apache.thrift.transport.TTransport;
 -import org.apache.thrift.transport.TTransportException;
 -
 +import org.apache.hadoop.mapreduce.*;
  
  public abstract class AbstractColumnFamilyInputFormat<K, Y> extends 
InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
  {
@@@ -226,64 -271,83 +227,65 @@@
          }
      }
  
 -    private List<CfSplit> getSubSplits(String keyspace, String cfName, 
TokenRange range, Configuration conf) throws IOException
 +    private Map<TokenRange, Long> getSubSplits(String keyspace, String 
cfName, TokenRange range, Configuration conf) throws IOException
      {
 -        int splitsize = ConfigHelper.getInputSplitSize(conf);
 -        for (int i = 0; i < range.rpc_endpoints.size(); i++)
 +        int splitSize = ConfigHelper.getInputSplitSize(conf);
 +        try
          {
 -            String host = range.rpc_endpoints.get(i);
 -
 -            if (host == null || host.equals("0.0.0.0"))
 -                host = range.endpoints.get(i);
 -
 -            try
 -            {
 -                Cassandra.Client client = ConfigHelper.createConnection(conf, 
host, ConfigHelper.getInputRpcPort(conf));
 -                client.set_keyspace(keyspace);
 -
 -                try
 -                {
 -                    return client.describe_splits_ex(cfName, 
range.start_token, range.end_token, splitsize);
 -                }
 -                catch (TApplicationException e)
 -                {
 -                    // fallback to guessing split size if talking to a server 
without describe_splits_ex method
 -                    if (e.getType() == TApplicationException.UNKNOWN_METHOD)
 -                    {
 -                        List<String> splitPoints = 
client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
 -                        return tokenListToSplits(splitPoints, splitsize);
 -                    }
 -                    throw e;
 -                }
 -            }
 -            catch (IOException e)
 -            {
 -                logger.debug("failed connect to endpoint {}", host, e);
 -            }
 -            catch (InvalidRequestException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
 -            catch (TException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
 +            return describeSplits(keyspace, cfName, range, splitSize);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new RuntimeException(e);
          }
 -        throw new IOException("failed connecting to all endpoints " + 
StringUtils.join(range.endpoints, ","));
      }
  
 -    private List<CfSplit> tokenListToSplits(List<String> splitTokens, int 
splitsize)
 +    private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String 
keyspace)
      {
 -        List<CfSplit> splits = 
Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
 -        for (int j = 0; j < splitTokens.size() - 1; j++)
 -            splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 
1), splitsize));
 -        return splits;
 +        try (Session session = 
CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","),
 conf).connect())
 +        {
 +            Map<TokenRange, Set<Host>> map = new HashMap<>();
 +            Metadata metadata = session.getCluster().getMetadata();
 +            for (TokenRange tokenRange : metadata.getTokenRanges())
 +                map.put(tokenRange, metadata.getReplicas('"' + keyspace + 
'"', tokenRange));
 +            return map;
 +        }
      }
  
 -    private List<TokenRange> getRangeMap(Configuration conf) throws 
IOException
 +    private Map<TokenRange, Long> describeSplits(String keyspace, String 
table, TokenRange tokenRange, int splitSize)
      {
 -        Cassandra.Client client = 
ConfigHelper.getClientFromInputAddressList(conf);
 -
 -        List<TokenRange> map;
 -        try
 +        String query = String.format("SELECT mean_partition_size, 
partitions_count " +
 +                                     "FROM %s.%s " +
 +                                     "WHERE keyspace_name = ? AND table_name 
= ? AND range_start = ? AND range_end = ?",
 +                                     SystemKeyspace.NAME,
 +                                     SystemKeyspace.SIZE_ESTIMATES);
 +
 +        ResultSet resultSet = session.execute(query, keyspace, table, 
tokenRange.getStart().toString(), tokenRange.getEnd().toString());
 +
 +        Row row = resultSet.one();
 +        // If we have no data on this split, return the full split i.e., do 
not sub-split
 +        // Assume smallest granularity of partition count available from 
CASSANDRA-7688
 +        if (row == null)
          {
 -            map = 
client.describe_local_ring(ConfigHelper.getInputKeyspace(conf));
 +            Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
 +            wrappedTokenRange.put(tokenRange, (long) 128);
 +            return wrappedTokenRange;
          }
 -        catch (InvalidRequestException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 -        catch (TException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 -        return map;
 +
 +        long meanPartitionSize = row.getLong("mean_partition_size");
 +        long partitionCount = row.getLong("partitions_count");
 +
 +        int splitCount = (int)((meanPartitionSize * partitionCount) / 
splitSize);
++        if (splitCount <= 0) splitCount = 1;
 +        List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
 +        Map<TokenRange, Long> rangesWithLength = new HashMap<>();
 +        for (TokenRange range : splitRanges)
 +            rangesWithLength.put(range, partitionCount/splitCount);
 +
 +        return rangesWithLength;
      }
  
 -    //
      // Old Hadoop API
 -    //
      public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, 
int numSplits) throws IOException
      {
          TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, 
new TaskAttemptID());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index e77c4c8,ced8aa9..9e6e23b
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@@ -23,16 -22,15 +23,17 @@@ import java.io.IOException
  import java.net.InetAddress;
  import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
 -import java.util.HashSet;
 -import java.util.List;
 -import java.util.Set;
 -import java.util.UUID;
 +import java.util.*;
 +import java.util.concurrent.*;
  
 -import org.apache.cassandra.config.EncryptionOptions;
 -import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
- 
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.Config;
 +import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.dht.IPartitioner;
++import org.apache.cassandra.dht.Murmur3Partitioner;
  import org.apache.cassandra.exceptions.InvalidRequestException;
 -import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
  import org.apache.cassandra.hadoop.BulkRecordWriter;
  import org.apache.cassandra.hadoop.ConfigHelper;
  import org.apache.cassandra.hadoop.HadoopCompat;
@@@ -84,6 -67,6 +85,7 @@@ public class CqlBulkRecordWriter extend
      private String insertStatement;
      private File outputDir;
      private boolean deleteSrc;
++    private IPartitioner partitioner;
  
      CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
      {
@@@ -113,55 -90,45 +115,64 @@@
      {
          // if anything is missing, exceptions will be thrown here, instead of 
on write()
          keyspace = ConfigHelper.getOutputKeyspace(conf);
 -        columnFamily = ConfigHelper.getOutputColumnFamily(conf);
 -        schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, 
columnFamily);
 -        insertStatement = 
CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
 -        outputDir = getColumnFamilyDirectory();
 +        table = ConfigHelper.getOutputColumnFamily(conf);
 +        
 +        // check if table is aliased
 +        String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table);
 +        if (aliasedCf != null)
 +            table = aliasedCf;
 +        
 +        schema = CqlBulkOutputFormat.getTableSchema(conf, table);
 +        insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, 
table);
 +        outputDir = getTableDirectory();
          deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
++        try
++        {
++            partitioner = ConfigHelper.getInputPartitioner(conf);
++        }
++        catch (Exception e)
++        {
++            partitioner = Murmur3Partitioner.instance;
++        }
 +    }
 +
 +    protected String getOutputLocation() throws IOException
 +    {
 +        String dir = conf.get(OUTPUT_LOCATION, 
System.getProperty("java.io.tmpdir"));
 +        if (dir == null)
 +            throw new IOException("Output directory not defined, if hadoop is 
not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
 +        return dir;
      }
  
 -    
      private void prepareWriter() throws IOException
      {
 -        try
 +        if (writer == null)
          {
 -            if (writer == null)
 -            {
 -                writer = CQLSSTableWriter.builder()
 -                    .forTable(schema)
 -                    .using(insertStatement)
 -                    .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
 -                    .inDirectory(outputDir)
 -                    
.withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
 -                    .build();
 -            }
 -            if (loader == null)
 -            {
 -                BulkLoader.ExternalClient externalClient = 
getExternalClient(conf);
 -                this.loader = new SSTableLoader(outputDir, externalClient, 
new BulkRecordWriter.NullOutputHandler()) {
 -                    @Override
 -                    public void onSuccess(StreamState finalState)
 -                    {
 -                        if (deleteSrc)
 -                            FileUtils.deleteRecursive(outputDir);
 -                    }
 -                };
 -            }
 +            writer = CQLSSTableWriter.builder()
 +                                     .forTable(schema)
 +                                     .using(insertStatement)
 +                                     
.withPartitioner(ConfigHelper.getOutputPartitioner(conf))
 +                                     .inDirectory(outputDir)
 +                                     
.withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
++                                     .withPartitioner(partitioner)
 +                                     .build();
          }
 -        catch (Exception e)
 +
 +        if (loader == null)
          {
 -            throw new IOException(e);
 -        }      
 +            ExternalClient externalClient = new ExternalClient(conf);
 +            externalClient.setTableMetadata(CFMetaData.compile(schema, 
keyspace));
 +
 +            loader = new SSTableLoader(outputDir, externalClient, new 
BulkRecordWriter.NullOutputHandler())
 +            {
 +                @Override
 +                public void onSuccess(StreamState finalState)
 +                {
 +                    if (deleteSrc)
 +                        FileUtils.deleteRecursive(outputDir);
 +                }
 +            };
 +        }
      }
      
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index dc3c174,5287bf5..223a848
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@@ -17,71 -17,47 +17,75 @@@
   */
  package org.apache.cassandra.hadoop.pig;
  
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
  import java.io.IOException;
 +import java.io.ObjectInputStream;
 +import java.io.ObjectOutputStream;
 +import java.io.Serializable;
 +import java.io.UnsupportedEncodingException;
 +import java.net.URLDecoder;
  import java.nio.ByteBuffer;
 -import java.nio.charset.CharacterCodingException;
  import java.util.*;
  
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
 -import 
org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
 -import org.apache.cassandra.db.BufferCell;
 -import org.apache.cassandra.db.Cell;
 -import org.apache.cassandra.db.composites.CellNames;
 +import com.datastax.driver.core.ColumnMetadata;
 +import com.datastax.driver.core.Metadata;
 +import com.datastax.driver.core.Row;
 +import com.datastax.driver.core.Session;
 +import com.datastax.driver.core.TableMetadata;
 +import com.datastax.driver.core.exceptions.NoHostAvailableException;
++
  import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.exceptions.AuthenticationException;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
 +import org.apache.cassandra.exceptions.SyntaxException;
  import org.apache.cassandra.hadoop.ConfigHelper;
  import org.apache.cassandra.hadoop.HadoopCompat;
+ import org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat;
++import org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter;
  import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
  import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
 -import org.apache.cassandra.thrift.*;
 +import org.apache.cassandra.serializers.CollectionSerializer;
  import org.apache.cassandra.utils.*;
 -import org.apache.commons.lang.StringUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.mapreduce.*;
 -import org.apache.pig.Expression;
 -import org.apache.pig.ResourceSchema;
 +import org.apache.pig.*;
  import org.apache.pig.Expression.OpType;
  import org.apache.pig.ResourceSchema.ResourceFieldSchema;
  import org.apache.pig.backend.executionengine.ExecException;
  import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
  import org.apache.pig.data.*;
  import org.apache.pig.impl.util.UDFContext;
 -import org.apache.thrift.TException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 +import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder;
  
 -import com.datastax.driver.core.Row;
  
 -public class CqlNativeStorage extends AbstractCassandraStorage
 +public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, 
LoadMetadata
  {
 +    protected String DEFAULT_INPUT_FORMAT;
 +    protected String DEFAULT_OUTPUT_FORMAT;
 +
 +    protected String username;
 +    protected String password;
 +    protected String keyspace;
 +    protected String column_family;
 +    protected String loadSignature;
 +    protected String storeSignature;
 +
 +    protected Configuration conf;
 +    protected String inputFormatClass;
 +    protected String outputFormatClass;
 +    protected int splitSize = 64 * 1024;
 +    protected String partitionerClass;
 +    protected boolean usePartitionFilter = false;
 +    protected String initHostAddress;
 +    protected String rpcPort;
 +    protected int nativeProtocolVersion = 1;
 +
      private static final Logger logger = 
LoggerFactory.getLogger(CqlNativeStorage.class);
 -    public static String BULK_OUTPUT_FORMAT = 
"org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
++    private static String BULK_OUTPUT_FORMAT = 
"org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
      private int pageSize = 1000;
      private String columns;
      private String outputQuery;
@@@ -109,6 -87,22 +113,16 @@@
      private String nativeSSLCipherSuites;
      private String inputCql;
  
+     private boolean bulkOutputFormat = false;
+     private String bulkCfSchema;
+     private String bulkInsertStatement;
+     private String bulkOutputLocation;
+     private int bulkBuffSize = -1;
+     private int bulkStreamThrottle = -1;
+     private int bulkMaxFailedHosts = -1;
 -    private int storagePort = CqlBulkOutputFormat.DEFAULT_STORAGE_PORT;
 -    private int sslStoragePort = CqlBulkOutputFormat.DEFAULT_SSL_STORAGE_PORT;
 -    private String serverKeystore;
 -    private String serverKeystorePassword;
 -    private String serverTruststore;
 -    private String serverTruststorePassword;
 -    private String serverCipherSuites;
 -    private String internodeEncrypt;
++    private boolean bulkDeleteSourceOnSuccess = true;
++    private String bulkTableAlias;
+ 
      public CqlNativeStorage()
      {
          this(1000);
@@@ -241,83 -240,188 +255,48 @@@
          return obj;
      }
  
 -    /** include key columns */
 -    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
 -            throws InvalidRequestException,
 -            UnavailableException,
 -            TimedOutException,
 -            SchemaDisagreementException,
 -            TException,
 -            CharacterCodingException,
 -            org.apache.cassandra.exceptions.InvalidRequestException,
 -            ConfigurationException,
 -            NotFoundException
 -    {
 -        List<ColumnDef> keyColumns = null;
 -        // get key columns
 +    /** get the columnfamily definition for the signature */
 +    protected TableInfo getCfInfo(String signature) throws IOException
 +    {
 +        UDFContext context = UDFContext.getUDFContext();
 +        Properties property = 
context.getUDFProperties(CqlNativeStorage.class);
 +        TableInfo cfInfo;
          try
          {
 -            keyColumns = getKeysMeta(client);
 +            cfInfo = cfdefFromString(property.getProperty(signature));
          }
 -        catch(Exception e)
 +        catch (ClassNotFoundException e)
          {
 -            logger.error("Error in retrieving key columns" , e);
 +            throw new IOException(e);
          }
 -
 -        // get other columns
 -        List<ColumnDef> columns = getColumnMeta(client, false, 
!hasCompactValueAlias);
 -
 -        // combine all columns in a list
 -        if (keyColumns != null && columns != null)
 -            keyColumns.addAll(columns);
 -
 -        return keyColumns;
 +        return cfInfo;
      }
  
 -    /** get keys meta data */
 -    private List<ColumnDef> getKeysMeta(Cassandra.Client client)
 -            throws Exception
 +    /** return the CfInfo for the column family */
 +    protected TableMetadata getCfInfo(Session client)
 +            throws NoHostAvailableException,
 +            AuthenticationException,
 +            IllegalStateException
      {
 -        String query = "SELECT key_aliases, " +
 -                "       column_aliases, " +
 -                "       key_validator, " +
 -                "       comparator, " +
 -                "       keyspace_name, " +
 -                "       value_alias, " +
 -                "       default_validator " +
 -                "FROM system.schema_columnfamilies " +
 -                "WHERE keyspace_name = '%s'" +
 -                "  AND columnfamily_name = '%s' ";
 -
 -        CqlResult result = client.execute_cql3_query(
 -                ByteBufferUtil.bytes(String.format(query, keyspace, 
column_family)),
 -                Compression.NONE,
 -                ConsistencyLevel.ONE);
 -
 -        if (result == null || result.rows == null || result.rows.isEmpty())
 -            return null;
 -
 -        Iterator<CqlRow> iteraRow = result.rows.iterator();
 -        List<ColumnDef> keys = new ArrayList<ColumnDef>();
 -        if (iteraRow.hasNext())
 -        {
 -            CqlRow cqlRow = iteraRow.next();
 -            String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
 -            logger.debug("Found ksDef name: {}", name);
 -            String keyString = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
 -
 -            logger.debug("partition keys: {}", keyString);
 -            List<String> keyNames = FBUtilities.fromJsonList(keyString);
 -
 -            Iterator<String> iterator = keyNames.iterator();
 -            while (iterator.hasNext())
 -            {
 -                ColumnDef cDef = new ColumnDef();
 -                cDef.name = ByteBufferUtil.bytes(iterator.next());
 -                keys.add(cDef);
 -            }
 -            // classic thrift tables
 -            if (keys.size() == 0)
 -            {
 -                CFMetaData cfm = getCFMetaData(keyspace, column_family, 
client);
 -                for (ColumnDefinition def : cfm.partitionKeyColumns())
 -                {
 -                    String key = def.name.toString();
 -                    logger.debug("name: {} ", key);
 -                    ColumnDef cDef = new ColumnDef();
 -                    cDef.name = ByteBufferUtil.bytes(key);
 -                    keys.add(cDef);
 -                }
 -                for (ColumnDefinition def : cfm.clusteringColumns())
 -                {
 -                    String key = def.name.toString();
 -                    logger.debug("name: {} ", key);
 -                    ColumnDef cDef = new ColumnDef();
 -                    cDef.name = ByteBufferUtil.bytes(key);
 -                    keys.add(cDef);
 -                }
 -            }
 -
 -            keyString = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
 -
 -            logger.debug("cluster keys: {}", keyString);
 -            keyNames = FBUtilities.fromJsonList(keyString);
 -
 -            iterator = keyNames.iterator();
 -            while (iterator.hasNext())
 -            {
 -                ColumnDef cDef = new ColumnDef();
 -                cDef.name = ByteBufferUtil.bytes(iterator.next());
 -                keys.add(cDef);
 -            }
 -
 -            String validator = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
 -            logger.debug("row key validator: {}", validator);
 -            AbstractType<?> keyValidator = parseType(validator);
 -
 -            Iterator<ColumnDef> keyItera = keys.iterator();
 -            if (keyValidator instanceof CompositeType)
 -            {
 -                Iterator<AbstractType<?>> typeItera = ((CompositeType) 
keyValidator).types.iterator();
 -                while (typeItera.hasNext())
 -                    keyItera.next().validation_class = 
typeItera.next().toString();
 -            }
 -            else
 -                keyItera.next().validation_class = keyValidator.toString();
 -
 -            validator = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
 -            logger.debug("cluster key validator: {}", validator);
 -
 -            if (keyItera.hasNext() && validator != null && 
!validator.isEmpty())
 -            {
 -                AbstractType<?> clusterKeyValidator = parseType(validator);
 -
 -                if (clusterKeyValidator instanceof CompositeType)
 -                {
 -                    Iterator<AbstractType<?>> typeItera = ((CompositeType) 
clusterKeyValidator).types.iterator();
 -                    while (keyItera.hasNext())
 -                        keyItera.next().validation_class = 
typeItera.next().toString();
 -                }
 -                else
 -                    keyItera.next().validation_class = 
clusterKeyValidator.toString();
 -            }
 -
 -            // compact value_alias column
 -            if (cqlRow.columns.get(5).value != null)
 -            {
 -                try
 -                {
 -                    String compactValidator = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
 -                    logger.debug("default validator: {}", compactValidator);
 -                    AbstractType<?> defaultValidator = 
parseType(compactValidator);
 -
 -                    ColumnDef cDef = new ColumnDef();
 -                    cDef.name = cqlRow.columns.get(5).value;
 -                    cDef.validation_class = defaultValidator.toString();
 -                    keys.add(cDef);
 -                    hasCompactValueAlias = true;
 -                }
 -                catch (Exception e)
 -                {
 -                    // no compact column at value_alias
 -                }
 -            }
 -
 -        }
 -        return keys;
 +        // get CF meta data
 +        return 
client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family));
      }
  
-     /** output: (((name, value), (name, value)), (value ... value), 
(value...value)) */
-     public void putNext(Tuple t) throws IOException
-     {
-         if (t.size() < 1)
-         {
-             // simply nothing here, we can't even delete without a key
-             logger.warn("Empty output skipped, filter empty tuples to 
suppress this warning");
-             return;
-         }
- 
-         if (t.getType(0) == DataType.TUPLE)
-         {
-             if (t.getType(1) == DataType.TUPLE)
-             {
-                 Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
-                 cqlQueryFromTuple(key, t, 1);
-             }
-             else
-                 throw new IOException("Second argument in output must be a 
tuple");
-         }
-         else
-             throw new IOException("First argument in output must be a tuple");
-     }
- 
      /** convert key tuple to key map */
      private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
      {
          Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
          for (int i = 0; i < t.size(); i++)
          {
-             if (t.getType(i) == DataType.TUPLE)
-             {
-                 Tuple inner = (Tuple) t.get(i);
-                 if (inner.size() == 2)
-                 {
-                     Object name = inner.get(0);
-                     if (name != null)
-                     {
-                         keys.put(name.toString(), objToBB(inner.get(1)));
-                     }
-                     else
-                         throw new IOException("Key name was empty");
-                 }
-                 else
-                     throw new IOException("Keys were not in name and value 
pairs");
-             }
-             else
-             {
+             if (t.getType(i) != DataType.TUPLE)
                  throw new IOException("keys was not a tuple");
-             }
 -
+             Tuple inner = (Tuple) t.get(i);
+             if (inner.size() != 2)
+                 throw new IOException("Keys were not in name and value 
pairs");
+             Object name = inner.get(0);
+             if (name == null)
+                 throw new IOException("Key name was empty");
+             keys.put(name.toString(), objToBB(inner.get(1)));
          }
          return keys;
      }
@@@ -543,10 -537,41 +517,41 @@@
      private String getWhereClauseForPartitionFilter()
      {
          UDFContext context = UDFContext.getUDFContext();
 -        Properties property = 
context.getUDFProperties(AbstractCassandraStorage.class);
 -        return property.getProperty(PARTITION_FILTER_SIGNATURE);
 +        Properties property = 
context.getUDFProperties(CqlNativeStorage.class);
 +        return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE);
      }
  
+     /**
+      *  output: (((name, value), (name, value)), (value ... value), 
(value...value))
+      *  bulk output: ((value ... value), (value...value))
+      *
+      * */
+     public void putNext(Tuple t) throws IOException
+     {
+         if (t.size() < 1)
+         {
+             // simply nothing here, we can't even delete without a key
+             logger.warn("Empty output skipped, filter empty tuples to 
suppress this warning");
+             return;
+         }
+ 
+         if (t.getType(0) != DataType.TUPLE)
+             throw new IOException("First argument in output must be a tuple");
+ 
+         if (!bulkOutputFormat && t.getType(1) != DataType.TUPLE)
+             throw new IOException("Second argument in output must be a 
tuple");
+ 
+         if (bulkOutputFormat)
+         {
+             cqlQueryFromTuple(null, t, 0);
+         }
+         else
+         {
+             Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+             cqlQueryFromTuple(key, t, 1);
+         }
+     }
+ 
      /** set read configuration settings */
      public void setLocation(String location, Job job) throws IOException
      {
@@@ -672,6 -699,42 +677,32 @@@
          ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
          CqlConfigHelper.setOutputCql(conf, outputQuery);
  
+         if (bulkOutputFormat)
+         {
+             DEFAULT_OUTPUT_FORMAT = BULK_OUTPUT_FORMAT;
+             if (bulkCfSchema != null)
 -                CqlBulkOutputFormat.setColumnFamilySchema(conf, 
column_family, bulkCfSchema);
++                CqlBulkOutputFormat.setTableSchema(conf, column_family, 
bulkCfSchema);
+             else
+                 throw new IOException("bulk_cf_schema is missing in input url 
parameter");
+             if (bulkInsertStatement != null)
 -                CqlBulkOutputFormat.setColumnFamilyInsertStatement(conf, 
column_family, bulkInsertStatement);
++                CqlBulkOutputFormat.setTableInsertStatement(conf, 
column_family, bulkInsertStatement);
+             else
+                 throw new IOException("bulk_insert_statement is missing in 
input url parameter");
++            if (bulkTableAlias != null)
++                CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, 
column_family); 
++            CqlBulkOutputFormat.setDeleteSourceOnSuccess(conf, 
bulkDeleteSourceOnSuccess);
+             if (bulkOutputLocation != null)
 -                conf.set(AbstractBulkRecordWriter.OUTPUT_LOCATION, 
bulkOutputLocation);
++                conf.set(CqlBulkRecordWriter.OUTPUT_LOCATION, 
bulkOutputLocation);
+             if (bulkBuffSize > 0)
 -                conf.set(AbstractBulkRecordWriter.BUFFER_SIZE_IN_MB, 
String.valueOf(bulkBuffSize));
++                conf.set(CqlBulkRecordWriter.BUFFER_SIZE_IN_MB, 
String.valueOf(bulkBuffSize));
+             if (bulkStreamThrottle > 0)
 -                conf.set(AbstractBulkRecordWriter.STREAM_THROTTLE_MBITS, 
String.valueOf(bulkStreamThrottle));
++                conf.set(CqlBulkRecordWriter.STREAM_THROTTLE_MBITS, 
String.valueOf(bulkStreamThrottle));
+             if (bulkMaxFailedHosts > 0)
 -                conf.set(AbstractBulkRecordWriter.MAX_FAILED_HOSTS, 
String.valueOf(bulkMaxFailedHosts));
 -            CqlBulkOutputFormat.setSSLStoragePort(conf, sslStoragePort);
 -            CqlBulkOutputFormat.setStoragePort(conf, storagePort);
 -            if (serverEncrypted())
 -            {
 -                if (!StringUtils.isEmpty(serverKeystore))
 -                    CqlBulkOutputFormat.setServerKeystore(conf, 
serverKeystore);
 -                if (!StringUtils.isEmpty(serverTruststore))
 -                    CqlBulkOutputFormat.setServerTruststore(conf, 
serverTruststore);
 -                if (!StringUtils.isEmpty(serverKeystorePassword))
 -                    CqlBulkOutputFormat.setServerKeystorePassword(conf, 
serverKeystorePassword);
 -                if (!StringUtils.isEmpty(serverTruststorePassword))
 -                    CqlBulkOutputFormat.setServerTruststorePassword(conf, 
serverTruststorePassword);
 -                if (!StringUtils.isEmpty(serverCipherSuites))
 -                    CqlBulkOutputFormat.setServerCipherSuites(conf, 
serverCipherSuites);
 -            }
++                conf.set(CqlBulkRecordWriter.MAX_FAILED_HOSTS, 
String.valueOf(bulkMaxFailedHosts));
++            if (partitionerClass!= null)
++                ConfigHelper.setInputPartitioner(conf, partitionerClass);
+         }
+ 
          setConnectionInformation();
  
          if (ConfigHelper.getOutputRpcPort(conf) == 0)
@@@ -773,6 -773,37 +804,25 @@@
                  if (urlQuery.containsKey("output_query"))
                      outputQuery = urlQuery.get("output_query");
  
+                 if (urlQuery.containsKey("bulk_output_format"))
+                     bulkOutputFormat = 
Boolean.valueOf(urlQuery.get("bulk_output_format"));
+                 if (urlQuery.containsKey("bulk_cf_schema"))
+                     bulkCfSchema = urlQuery.get("bulk_cf_schema");
+                 if (urlQuery.containsKey("bulk_insert_statement"))
+                     bulkInsertStatement = 
urlQuery.get("bulk_insert_statement");
+                 if (urlQuery.containsKey("bulk_output_location"))
+                     bulkOutputLocation = urlQuery.get("bulk_output_location");
+                 if (urlQuery.containsKey("bulk_buff_size"))
+                     bulkBuffSize = 
Integer.valueOf(urlQuery.get("bulk_buff_size"));
+                 if (urlQuery.containsKey("bulk_stream_throttle"))
+                     bulkStreamThrottle = 
Integer.valueOf(urlQuery.get("bulk_stream_throttle"));
+                 if (urlQuery.containsKey("bulk_max_failed_hosts"))
+                     bulkMaxFailedHosts = 
Integer.valueOf(urlQuery.get("bulk_max_failed_hosts"));
 -                if (urlQuery.containsKey("storage_port"))
 -                    storagePort = 
Integer.valueOf(urlQuery.get("storage_port"));
 -                if (urlQuery.containsKey("ssl_storage_port"))
 -                    sslStoragePort = 
Integer.valueOf(urlQuery.get("ssl_storage_port"));
 -                if (urlQuery.containsKey("internode_encrypt"))
 -                    internodeEncrypt = urlQuery.get("internode_encrypt");
 -                if (urlQuery.containsKey("server_keystore"))
 -                    serverKeystore = urlQuery.get("server_keystore");
 -                if (urlQuery.containsKey("server_truststore"))
 -                    serverTruststore = urlQuery.get("server_truststore");
 -                if (urlQuery.containsKey("server_keystore_pass"))
 -                    serverKeystorePassword = 
urlQuery.get("server_keystore_pass");
 -                if (urlQuery.containsKey("server_truststore_pass"))
 -                    serverTruststorePassword = 
urlQuery.get("server_truststore_pass");
 -                if (urlQuery.containsKey("server_cipher_suites"))
 -                    serverCipherSuites = urlQuery.get("server_cipher_suites");
++                if (urlQuery.containsKey("bulk_delete_source"))
++                    bulkDeleteSourceOnSuccess = 
Boolean.parseBoolean(urlQuery.get("bulk_delete_source"));
++                if (urlQuery.containsKey("bulk_table_alias"))
++                    bulkTableAlias = urlQuery.get("bulk_table_alias");
+ 
                  //split size
                  if (urlQuery.containsKey("split_size"))
                      splitSize = Integer.parseInt(urlQuery.get("split_size"));
@@@ -855,10 -888,20 +905,13 @@@
                      
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]"
 +
                      
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]"
 +
                      
"[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]"
 +
-                     "[columns=<columns>][where_clause=<where_clause>]]': " + 
e.getMessage());
-         }
+                     "[columns=<columns>][where_clause=<where_clause>]" +
 -                    
"[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement]"
 +
 -                    
"[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>]"
 +
 -                    
"[&storage_port=<storage_port>][&ssl_storage_port=<ssl_storage_port>]" +
 -                    
"[&server_keystore=<server_keystore>][&server_keystore_pass=<server_keystore_pass>]"
 +
 -                    
"[&server_truststore=<server_truststore>][&server_truststore_pass=<server_truststore_pass>]"
 +
 -                    
"[&server_cipher_suites=<server_cipher_suites>][&internode_encrypt=<internode_encrypt>]"
 +
++                    
"[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement][&bulk_table_alias=<bulk_table_alias>]"
 +
++                    
"[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>][&bulk_delete_source=<bulk_delete_source>]"
 +
+                     
"[&bulk_stream_throttle=<bulk_stream_throttle>][&bulk_max_failed_hosts=<bulk_max_failed_hosts>]]':
 " +  e.getMessage());
+          }
      }
  
 -    /**
 -     * Thrift API can't handle null, so use empty byte array
 -     */
      public ByteBuffer nullToBB()
      {
          return ByteBuffer.wrap(new byte[0]);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/conf/cassandra_pig.yaml
----------------------------------------------------------------------
diff --cc test/conf/cassandra_pig.yaml
index 0000000,0000000..68615cf
new file mode 100644
--- /dev/null
+++ b/test/conf/cassandra_pig.yaml
@@@ -1,0 -1,0 +1,41 @@@
++#
++# Warning!
++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing 
schemas in this file.
++#
++cluster_name: Test Cluster
++memtable_allocation_type: offheap_objects
++commitlog_sync: batch
++commitlog_sync_batch_window_in_ms: 1.0
++commitlog_segment_size_in_mb: 5
++commitlog_directory: build/test/cassandra/commitlog
++partitioner: org.apache.cassandra.dht.Murmur3Partitioner
++listen_address: 127.0.0.1
++storage_port: 7010
++rpc_port: 9170
++start_native_transport: true
++native_transport_port: 9042
++column_index_size_in_kb: 4
++saved_caches_directory: build/test/cassandra/saved_caches
++data_file_directories:
++    - build/test/cassandra/data
++disk_access_mode: mmap
++seed_provider:
++    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
++      parameters:
++          - seeds: "127.0.0.1"
++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
++dynamic_snitch: true
++request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
++request_scheduler_id: keyspace
++server_encryption_options:
++    internode_encryption: none
++    keystore: conf/.keystore
++    keystore_password: cassandra
++    truststore: conf/.truststore
++    truststore_password: cassandra
++incremental_backups: true
++concurrent_compactors: 4
++compaction_throughput_mb_per_sec: 0
++row_cache_class_name: org.apache.cassandra.cache.OHCProvider
++row_cache_size_in_mb: 16
++enable_user_defined_functions: true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --cc test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 72fdd5a,2e1758e..3902fce
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@@ -265,4 -317,33 +272,32 @@@ public class CqlTableTest extends PigTe
              Assert.fail("Can't fetch any data");
          }
      }
+ 
+     @Test
 -    public void testCqlStorageSingleKeyTableBulkLoad()
 -    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
++    public void testCqlStorageSingleKeyTableBulkLoad() throws TException, 
IOException
+     {
+         pig.setBatchOn();
+         //input_cql=select * from moredata where token(x) > ? and token(x) <= 
?
+         pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+         pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE 
TOTUPLE(x, y);");
+         pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test_bulk?" 
+ defaultParameters + nativeParameters +  
"&bulk_output_format=true&bulk_cf_schema=CREATE%20TABLE%20cql3ks.test_bulk%20(a%20int%20PRIMARY%20KEY%2C%20b%20int)&bulk_insert_statement=Insert%20into%20cql3ks.test_bulk(a%2C%20b)%20values(%3F%2C%3F)'
 USING CqlNativeStorage();");
+         pig.executeBatch();
+ 
+         //(5,5)
+         //(6,6)
+         //(4,4)
+         //(2,2)
+         //(3,3)
+         //(1,1)
+         //input_cql=select * from test_bulk1 where token(a) > ? and token(a) 
<= ?
+         pig.registerQuery("result= LOAD 'cql://cql3ks/test_bulk?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20test_bulk%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+         Iterator<Tuple> it = pig.openIterator("result");
+         int count = 0;
+         while (it.hasNext()) {
+             Tuple t = it.next();
+             Assert.assertEquals(t.get(0), t.get(1));
+             count ++;
+         }
+         Assert.assertEquals(6, count);
+      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --cc test/pig/org/apache/cassandra/pig/PigTestBase.java
index 8c27f6c,e6964f8..a8a9de5
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@@ -54,7 -65,8 +54,7 @@@ public class PigTestBase extends Schema
      protected static Configuration conf;
      protected static MiniCluster cluster; 
      protected static PigServer pig;
-     protected static String defaultParameters= 
"init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
 -    protected static String defaultParameters= 
"init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner"
 +
 -                                               
"&storage_port=7010&ssl_storage_port=7011&internode_encrypt=NONE";
++    protected static String defaultParameters= 
"init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.Murmur3Partitioner";
      protected static String nativeParameters = 
"&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000"
  +
                                                 
"&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3"
 +
                                                 
"&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9042";
@@@ -62,6 -74,6 +62,7 @@@
      static
      {
          System.setProperty("logback.configurationFile", "logback-test.xml");
++        System.setProperty("cassandra.config", "cassandra_pig.yaml");
      }
  
      @AfterClass

Reply via email to