Updated Branches:
  refs/heads/cassandra-1.0 4ab6fad94 -> 9ca84786b
  refs/heads/cassandra-1.1 c5986871c -> c98edc3e8


merge from 1.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c98edc3e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c98edc3e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c98edc3e

Branch: refs/heads/cassandra-1.1
Commit: c98edc3e81c8c1e19370802ab6c82a7e5ff00f42
Parents: c598687 9ca8478
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Mon Feb 13 16:31:41 2012 -0600
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Mon Feb 13 16:31:41 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/Table.java        |    2 +-
 .../cassandra/db/compaction/CompactionManager.java |   26 +++++++++-----
 .../cassandra/hadoop/pig/CassandraStorage.java     |    6 ++--
 .../apache/cassandra/thrift/CustomTHsHaServer.java |    8 ++++
 5 files changed, 30 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c98edc3e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 359e699,500b9fb..d39c9dd
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,81 -1,5 +1,82 @@@
 +1.1-dev
 + * add nodetool rebuild_index (CASSANDRA-3583)
 + * add nodetool rangekeysample (CASSANDRA-2917)
 + * Fix streaming too much data during move operations (CASSANDRA-3639)
 + * Nodetool and CLI connect to localhost by default (CASSANDRA-3568)
 + * Reduce memory used by primary index sample (CASSANDRA-3743)
 + * (Hadoop) separate input/output configurations (CASSANDRA-3197, 3765)
 + * avoid returning internal Cassandra classes over JMX (CASSANDRA-2805)
 + * add row-level isolation via SnapTree (CASSANDRA-2893)
 + * Optimize key count estimation when opening sstable on startup
 +   (CASSANDRA-2988)
 + * multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577)
 + * add command to stop compactions (CASSANDRA-1740, 3566, 3582)
 + * multithreaded streaming (CASSANDRA-3494)
 + * removed in-tree redhat spec (CASSANDRA-3567)
 + * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503)
 + * Recycle commitlog segments for improved performance 
 +   (CASSANDRA-3411, 3543, 3557, 3615)
 + * update size-tiered compaction to prioritize small tiers (CASSANDRA-2407)
 + * add message expiration logic to OutboundTcpConnection (CASSANDRA-3005)
 + * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
 + * EACH_QUORUM is only supported for writes (CASSANDRA-3272)
 + * replace compactionlock use in schema migration by checking CFS.isValid
 +   (CASSANDRA-3116)
 + * recognize that "SELECT first ... *" isn't really "SELECT *" 
(CASSANDRA-3445)
 + * Use faster bytes comparison (CASSANDRA-3434)
 + * Bulk loader is no longer a fat client, (HADOOP) bulk load output format
 +   (CASSANDRA-3045)
 + * (Hadoop) add support for KeyRange.filter
 + * remove assumption that keys and token are in bijection
 +   (CASSANDRA-1034, 3574, 3604)
 + * always remove endpoints from delevery queue in HH (CASSANDRA-3546)
 + * fix race between cf flush and its 2ndary indexes flush (CASSANDRA-3547)
 + * fix potential race in AES when a repair fails (CASSANDRA-3548)
 + * Remove columns shadowed by a deleted container even when we cannot purge
 +   (CASSANDRA-3538)
 + * Improve memtable slice iteration performance (CASSANDRA-3545)
 + * more efficient allocation of small bloom filters (CASSANDRA-3618)
 + * Use separate writer thread in SSTableSimpleUnsortedWriter (CASSANDRA-3619)
 + * fsync the directory after new sstable or commitlog segment are created 
(CASSANDRA-3250)
 + * fix minor issues reported by FindBugs (CASSANDRA-3658)
 + * global key/row caches (CASSANDRA-3143, 3849)
 + * optimize memtable iteration during range scan (CASSANDRA-3638)
 + * introduce 'crc_check_chance' in CompressionParameters to support
 +   a checksum percentage checking chance similarly to read-repair 
(CASSANDRA-3611)
 + * a way to deactivate global key/row cache on per-CF basis (CASSANDRA-3667)
 + * fix LeveledCompactionStrategy broken because of generation pre-allocation
 +   in LeveledManifest (CASSANDRA-3691)
 + * finer-grained control over data directories (CASSANDRA-2749)
 + * Fix ClassCastException during hinted handoff (CASSANDRA-3694)
 + * Upgrade Thrift to 0.7 (CASSANDRA-3213)
 + * Make stress.java insert operation to use microseconds (CASSANDRA-3725)
 + * Allows (internally) doing a range query with a limit of columns instead of
 +   rows (CASSANDRA-3742)
 + * Allow rangeSlice queries to be start/end inclusive/exclusive 
(CASSANDRA-3749)
 + * Fix BulkLoader to support new SSTable layout and add stream
 +   throttling to prevent an NPE when there is no yaml config (CASSANDRA-3752)
 + * Allow concurrent schema migrations (CASSANDRA-1391, 3832)
 + * Add SnapshotCommand to trigger snapshot on remote node (CASSANDRA-3721)
 + * Make CFMetaData conversions to/from thrift/native schema inverses
 +   (CASSANDRA_3559)
 + * Add initial code for CQL 3.0-beta (CASSANDRA-3781, 3753)
 + * Add wide row support for ColumnFamilyInputFormat (CASSANDRA-3264)
 + * Allow extending CompositeType comparator (CASSANDRA-3657)
 + * Avoids over-paging during get_count (CASSANDRA-3798)
 + * Add new command to rebuild a node without (repair) merkle tree calculations
 +   (CASSANDRA-3483)
 + * respect not only row cache capacity but caching mode when
 +   trying to read data (CASSANDRA-3812)
 + * fix system tests (CASSANDRA-3827)
 + * CQL support for altering key_validation_class in ALTER TABLE 
(CASSANDRA-3781)
 + * turn compression on by default (CASSANDRA-3871)
 + * make hexToBytes refuse invalid input (CASSANDRA-2851)
 + * Make secondary indexes CF inherit compression and compaction from their
 +   parent CF (CASSANDRA-3877)
 +
 +
  1.0.8
+  * fix race between cleanup and flush on secondary index CFSes 
(CASSANDRA-3712)
   * avoid including non-queried nodes in rangeslice read repair
     (CASSANDRA-3843)
   * Only snapshot CF being compacted for snapshot_before_compaction 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c98edc3e/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c98edc3e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c02aed2,97e5067..f30510b
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1168,14 -1163,4 +1176,14 @@@ public class CompactionManager implemen
              }
          }
      }
 +
 +    public void stopCompaction(String type)
 +    {
 +        OperationType operation = OperationType.valueOf(type);
 +        for (Holder holder : CompactionExecutor.getCompactions())
 +        {
 +            if (holder.getCompactionInfo().getTaskType() == operation)
 +                holder.stop();
 +        }
 +    }
- }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c98edc3e/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index e424c4b,0000000..970e854
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -1,735 -1,0 +1,735 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the 
License.
 + * You may obtain a copy of the License at
 + * 
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations 
under
 + * the License.
 + */
 +package org.apache.cassandra.hadoop.pig;
 +
 +import java.io.IOException;
 +import java.math.BigInteger;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import org.apache.cassandra.config.ConfigurationException;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.marshal.IntegerType;
 +import org.apache.cassandra.db.marshal.TypeParser;
 +import org.apache.cassandra.thrift.*;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Hex;
 +import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +
 +import org.apache.cassandra.db.Column;
 +import org.apache.cassandra.db.IColumn;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.hadoop.*;
 +import org.apache.cassandra.thrift.Mutation;
 +import org.apache.cassandra.thrift.Deletion;
 +import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.mapreduce.*;
 +
 +import org.apache.pig.*;
 +import org.apache.pig.backend.executionengine.ExecException;
 +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 +import org.apache.pig.data.*;
 +import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 +import org.apache.pig.impl.util.UDFContext;
 +import org.apache.thrift.TDeserializer;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TSerializer;
 +
 +/**
 + * A LoadStoreFunc for retrieving data from and storing data to Cassandra
 + *
 + * A row from a standard CF will be returned as nested tuples: (key, ((name1, 
val1), (name2, val2))).
 + */
 +public class CassandraStorage extends LoadFunc implements StoreFuncInterface, 
LoadMetadata
 +{
 +    // system environment variables that can be set to configure connection 
info:
 +    // alternatively, Hadoop JobConf variables can be set using keys from 
ConfigHelper
 +    public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
 +    public final static String PIG_INPUT_INITIAL_ADDRESS = 
"PIG_INPUT_INITIAL_ADDRESS";
 +    public final static String PIG_INPUT_PARTITIONER = 
"PIG_INPUT_PARTITIONER";
 +    public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
 +    public final static String PIG_OUTPUT_INITIAL_ADDRESS = 
"PIG_OUTPUT_INITIAL_ADDRESS";
 +    public final static String PIG_OUTPUT_PARTITIONER = 
"PIG_OUTPUT_PARTITIONER";
 +    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
 +    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
 +    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
 +    public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
 +    public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
 +
 +    private final static String DEFAULT_INPUT_FORMAT = 
"org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
 +    private final static String DEFAULT_OUTPUT_FORMAT = 
"org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
 +    
 +    private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +    private static final Log logger = 
LogFactory.getLog(CassandraStorage.class);
 +
 +    private ByteBuffer slice_start = BOUND;
 +    private ByteBuffer slice_end = BOUND;
 +    private boolean slice_reverse = false;
 +    private String keyspace;
 +    private String column_family;
 +    private String loadSignature;
 +    private String storeSignature;
 +
 +    private Configuration conf;
 +    private RecordReader reader;
 +    private RecordWriter writer;
 +    private String inputFormatClass;
 +    private String outputFormatClass;
 +    private int limit;
 +
 +    public CassandraStorage()
 +    {
 +        this(1024);
 +    }
 +
 +    /**
 +     * @param limit: number of columns to fetch in a slice
 +     */
 +    public CassandraStorage(int limit)
 +    {
 +        super();
 +        this.limit = limit;
 +    }
 +
 +    public int getLimit() 
 +    {
 +        return limit;
 +    }
 +
 +    @Override
 +    public Tuple getNext() throws IOException
 +    {
 +        try
 +        {
 +            // load the next pair
 +            if (!reader.nextKeyValue())
 +                return null;
 +
 +            CfDef cfDef = getCfDef(loadSignature);
 +            ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
 +            SortedMap<ByteBuffer,IColumn> cf = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
 +            assert key != null && cf != null;
 +            
 +            // and wrap it in a tuple
 +            Tuple tuple = TupleFactory.getInstance().newTuple(2);
 +            ArrayList<Tuple> columns = new ArrayList<Tuple>();
 +            tuple.set(0, new DataByteArray(key.array(), 
key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
 +            for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
 +            {
 +                columns.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
 +            }
 +
 +            tuple.set(1, new DefaultDataBag(columns));
 +            return tuple;
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new IOException(e.getMessage());
 +        }
 +    }
 +
 +    private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType 
comparator) throws IOException
 +    {
 +        Tuple pair = TupleFactory.getInstance().newTuple(2);
 +        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
 +        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 +
 +        setTupleValue(pair, 0, comparator.compose(col.name()));
 +        if (col instanceof Column)
 +        {
 +            // standard
 +            if (validators.get(col.name()) == null)
 +                setTupleValue(pair, 1, 
marshallers.get(1).compose(col.value()));
 +            else
 +                setTupleValue(pair, 1, 
validators.get(col.name()).compose(col.value()));
 +            return pair;
 +        }
 +        else
 +        {
 +            // super
 +            ArrayList<Tuple> subcols = new ArrayList<Tuple>();
 +            for (IColumn subcol : col.getSubColumns())
 +                subcols.add(columnToTuple(subcol, cfDef, 
parseType(cfDef.getSubcomparator_type())));
 +
 +            pair.set(1, new DefaultDataBag(subcols));
 +        }
 +        return pair;
 +    }
 +
 +    private void setTupleValue(Tuple pair, int position, Object value) throws 
ExecException
 +    {
 +       if (value instanceof BigInteger)
 +           pair.set(position, ((BigInteger) value).intValue());
 +       else if (value instanceof ByteBuffer)
 +           pair.set(position, new 
DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
 +       else if (value instanceof UUID)
 +           pair.set(position, new 
DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
 +       else
 +           pair.set(position, value);
 +    }
 +
 +    private CfDef getCfDef(String signature)
 +    {
 +        UDFContext context = UDFContext.getUDFContext();
 +        Properties property = 
context.getUDFProperties(CassandraStorage.class);
 +        return cfdefFromString(property.getProperty(signature));
 +    }
 +
 +    private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws 
IOException
 +    {
 +        ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
 +        AbstractType comparator;
 +        AbstractType subcomparator;
 +        AbstractType default_validator;
 +        AbstractType key_validator;
 +        try
 +        {
 +            comparator = TypeParser.parse(cfDef.getComparator_type());
 +            subcomparator = TypeParser.parse(cfDef.getSubcomparator_type());
 +            default_validator = 
TypeParser.parse(cfDef.getDefault_validation_class());
 +            key_validator = TypeParser.parse(cfDef.getKey_validation_class());
 +        }
 +        catch (ConfigurationException e)
 +        {
 +            throw new IOException(e);
 +        }
 +
 +        marshallers.add(comparator);
 +        marshallers.add(default_validator);
 +        marshallers.add(key_validator);
 +        marshallers.add(subcomparator);
 +        return marshallers;
 +    }
 +
 +    private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws 
IOException
 +    {
 +        Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, 
AbstractType>();
 +        for (ColumnDef cd : cfDef.getColumn_metadata())
 +        {
 +            if (cd.getValidation_class() != null && 
!cd.getValidation_class().isEmpty())
 +            {
 +                AbstractType validator = null;
 +                try
 +                {
 +                    validator = TypeParser.parse(cd.getValidation_class());
 +                    validators.put(cd.name, validator);
 +                }
 +                catch (ConfigurationException e)
 +                {
 +                    throw new IOException(e);
 +                }
 +            }
 +        }
 +        return validators;
 +    }
 +
 +    private AbstractType parseType(String type) throws IOException
 +    {
 +        try
 +        {
 +            return TypeParser.parse(type);
 +        }
 +        catch (ConfigurationException e)
 +        {
 +            throw new IOException(e);
 +        }
 +    }
 +
 +    @Override
 +    public InputFormat getInputFormat()
 +    {
 +        try
 +        {
 +            return FBUtilities.construct(inputFormatClass, "inputformat");
 +        }
 +        catch (ConfigurationException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    @Override
 +    public void prepareToRead(RecordReader reader, PigSplit split)
 +    {
 +        this.reader = reader;
 +    }
 +
 +    public static Map<String, String> getQueryMap(String query)
 +    {
 +        String[] params = query.split("&");
 +        Map<String, String> map = new HashMap<String, String>();
 +        for (String param : params)
 +        {
 +            String[] keyValue = param.split("=");
 +            map.put(keyValue[0], keyValue[1]);
 +        }
 +        return map;
 +    }
 +
 +    private void setLocationFromUri(String location) throws IOException
 +    {
 +        // parse uri into keyspace and columnfamily
 +        String names[];
 +        try
 +        {
 +            if (!location.startsWith("cassandra://"))
 +                throw new Exception("Bad scheme.");
 +            String[] urlParts = location.split("\\?");
 +            if (urlParts.length > 1)
 +            {
 +                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
 +                AbstractType comparator = BytesType.instance;
 +                if (urlQuery.containsKey("comparator"))
 +                    comparator = TypeParser.parse(urlQuery.get("comparator"));
 +                if (urlQuery.containsKey("slice_start"))
 +                    slice_start = 
comparator.fromString(urlQuery.get("slice_start"));
 +                if (urlQuery.containsKey("slice_end"))
 +                    slice_end = 
comparator.fromString(urlQuery.get("slice_end"));
 +                if (urlQuery.containsKey("reversed"))
 +                    slice_reverse = 
Boolean.parseBoolean(urlQuery.get("reversed"));
 +                if (urlQuery.containsKey("limit"))
 +                    limit = Integer.parseInt(urlQuery.get("limit"));
 +            }
 +            String[] parts = urlParts[0].split("/+");
 +            keyspace = parts[1];
 +            column_family = parts[2];
 +        }
 +        catch (Exception e)
 +        {
 +            throw new IOException("Expected 
'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]':
 " + e.getMessage());
 +        }
 +    }
 +
 +    private void setConnectionInformation() throws IOException
 +    {
 +        if (System.getenv(PIG_RPC_PORT) != null)
 +        {
 +            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
 +            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
 +        }
 +
 +        if (System.getenv(PIG_INPUT_RPC_PORT) != null)
 +            ConfigHelper.setInputRpcPort(conf, 
System.getenv(PIG_INPUT_RPC_PORT));
 +        if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
 +            ConfigHelper.setOutputRpcPort(conf, 
System.getenv(PIG_OUTPUT_RPC_PORT));
 +
 +        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
 +        {
 +            ConfigHelper.setInputInitialAddress(conf, 
System.getenv(PIG_INITIAL_ADDRESS));
 +            ConfigHelper.setOutputInitialAddress(conf, 
System.getenv(PIG_INITIAL_ADDRESS));
 +        }
 +        if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
 +            ConfigHelper.setInputInitialAddress(conf, 
System.getenv(PIG_INPUT_INITIAL_ADDRESS));
 +        if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
 +            ConfigHelper.setOutputInitialAddress(conf, 
System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
 +
 +        if (System.getenv(PIG_PARTITIONER) != null)
 +        {
 +            ConfigHelper.setInputPartitioner(conf, 
System.getenv(PIG_PARTITIONER));
 +            ConfigHelper.setOutputPartitioner(conf, 
System.getenv(PIG_PARTITIONER));
 +        }
 +        if(System.getenv(PIG_INPUT_PARTITIONER) != null)
 +            ConfigHelper.setInputPartitioner(conf, 
System.getenv(PIG_INPUT_PARTITIONER));
 +        if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
 +            ConfigHelper.setOutputPartitioner(conf, 
System.getenv(PIG_OUTPUT_PARTITIONER));
 +        if (System.getenv(PIG_INPUT_FORMAT) != null)
 +            inputFormatClass = 
getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
 +        else
 +            inputFormatClass = DEFAULT_INPUT_FORMAT;
 +        if (System.getenv(PIG_OUTPUT_FORMAT) != null)
 +            outputFormatClass = 
getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
 +        else
 +            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
 +    }
 +    
 +    private String getFullyQualifiedClassName(String classname)
 +    {
 +        return classname.contains(".") ? classname : 
"org.apache.cassandra.hadoop." + classname;
 +    }
 +
 +    @Override
 +    public void setLocation(String location, Job job) throws IOException
 +    {
 +        conf = job.getConfiguration();
 +        setLocationFromUri(location);
 +        if (ConfigHelper.getInputSlicePredicate(conf) == null)
 +        {
 +            SliceRange range = new SliceRange(slice_start, slice_end, 
slice_reverse, limit);
 +            SlicePredicate predicate = new 
SlicePredicate().setSlice_range(range);
 +            ConfigHelper.setInputSlicePredicate(conf, predicate);
 +        }
 +        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
 +        setConnectionInformation();
 +
 +        if (ConfigHelper.getInputRpcPort(conf) == 0)
 +            throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT 
environment variable not set");
 +        if (ConfigHelper.getInputInitialAddress(conf) == null)
 +            throw new IOException("PIG_INPUT_INITIAL_ADDRESS or 
PIG_INITIAL_ADDRESS environment variable not set");
 +        if (ConfigHelper.getInputPartitioner(conf) == null)
 +            throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER 
environment variable not set");
 +
 +        initSchema(loadSignature);
 +    }
 +
 +    public ResourceSchema getSchema(String location, Job job) throws 
IOException
 +    {
 +        setLocation(location, job);
 +        CfDef cfDef = getCfDef(loadSignature);
 +
 +        if (cfDef.column_type.equals("Super"))
 +            return null;
 +        // top-level schema, no type
 +        ResourceSchema schema = new ResourceSchema();
 +
 +        // get default marshallers and validators
 +        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
 +        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 +
 +        // add key
 +        ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
 +        keyFieldSchema.setName("key");
 +        keyFieldSchema.setType(getPigType(marshallers.get(2)));
 +
 +        // will become the bag of tuples
 +        ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
 +        bagFieldSchema.setName("columns");
 +        bagFieldSchema.setType(DataType.BAG);
 +        ResourceSchema bagSchema = new ResourceSchema();
 +
 +        List<ResourceFieldSchema> tupleFields = new 
ArrayList<ResourceFieldSchema>();
 +
 +        // default comparator/validator
 +        ResourceSchema innerTupleSchema = new ResourceSchema();
 +        ResourceFieldSchema tupleField = new ResourceFieldSchema();
 +        tupleField.setType(DataType.TUPLE);
 +        tupleField.setSchema(innerTupleSchema);
 +
 +        ResourceFieldSchema colSchema = new ResourceFieldSchema();
 +        colSchema.setName("name");
 +        colSchema.setType(getPigType(marshallers.get(0)));
 +        tupleFields.add(colSchema);
 +
 +        ResourceFieldSchema valSchema = new ResourceFieldSchema();
 +        AbstractType validator = marshallers.get(1);
 +        valSchema.setName("value");
 +        valSchema.setType(getPigType(validator));
 +        tupleFields.add(valSchema);
 +
 +        // defined validators/indexes
 +        for (ColumnDef cdef : cfDef.column_metadata)
 +        {
 +            colSchema = new ResourceFieldSchema();
 +            colSchema.setName(new String(cdef.getName()));
 +            colSchema.setType(getPigType(marshallers.get(0)));
 +            tupleFields.add(colSchema);
 +
 +            valSchema = new ResourceFieldSchema();
 +            validator = validators.get(ByteBuffer.wrap(cdef.getName()));
 +            if (validator == null)
 +                validator = marshallers.get(1);
 +            valSchema.setName("value");
 +            valSchema.setType(getPigType(validator));
 +            tupleFields.add(valSchema);
 +        }
 +        innerTupleSchema.setFields(tupleFields.toArray(new 
ResourceFieldSchema[tupleFields.size()]));
 +
 +        // a bag can contain only one tuple, but that tuple can contain 
anything
 +        bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
 +        bagFieldSchema.setSchema(bagSchema);
 +        // top level schema contains everything
 +        schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, 
bagFieldSchema });
 +        return schema;
 +    }
 +
 +    private byte getPigType(AbstractType type)
 +    {
 +        if (type instanceof LongType)
 +            return DataType.LONG;
 +        else if (type instanceof IntegerType)
 +            return DataType.INTEGER;
 +        else if (type instanceof AsciiType)
 +            return DataType.CHARARRAY;
 +        else if (type instanceof UTF8Type)
 +            return DataType.CHARARRAY;
 +        else if (type instanceof FloatType)
 +            return DataType.FLOAT;
 +        else if (type instanceof DoubleType)
 +            return DataType.DOUBLE;
 +        return DataType.BYTEARRAY;
 +    }
 +
 +    public ResourceStatistics getStatistics(String location, Job job)
 +    {
 +        return null;
 +    }
 +
 +    public String[] getPartitionKeys(String location, Job job)
 +    {
 +        return null;
 +    }
 +
 +    public void setPartitionFilter(Expression partitionFilter)
 +    {
 +        // no-op
 +    }
 +
 +    @Override
 +    public String relativeToAbsolutePath(String location, Path curDir) throws 
IOException
 +    {
 +        return location;
 +    }
 +
 +    @Override
 +    public void setUDFContextSignature(String signature)
 +    {
 +        this.loadSignature = signature;
 +    }
 +
 +    /* StoreFunc methods */
 +    public void setStoreFuncUDFContextSignature(String signature)
 +    {
 +        this.storeSignature = signature;
 +    }
 +
 +    public String relToAbsPathForStoreLocation(String location, Path curDir) 
throws IOException
 +    {
 +        return relativeToAbsolutePath(location, curDir);
 +    }
 +
 +    public void setStoreLocation(String location, Job job) throws IOException
 +    {
 +        conf = job.getConfiguration();
 +        setLocationFromUri(location);
 +        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
 +        setConnectionInformation();
 +
 +        if (ConfigHelper.getOutputRpcPort(conf) == 0)
 +            throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT 
environment variable not set");
 +        if (ConfigHelper.getOutputInitialAddress(conf) == null)
 +            throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or 
PIG_INITIAL_ADDRESS environment variable not set");
 +        if (ConfigHelper.getOutputPartitioner(conf) == null)
 +            throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER 
environment variable not set");
 +
 +        initSchema(storeSignature);
 +    }
 +
 +    public OutputFormat getOutputFormat()
 +    {
 +        try
 +        {
 +            return FBUtilities.construct(outputFormatClass, "outputformat");
 +        }
 +        catch (ConfigurationException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    public void checkSchema(ResourceSchema schema) throws IOException
 +    {
 +        // we don't care about types, they all get casted to ByteBuffers
 +    }
 +
 +    public void prepareToWrite(RecordWriter writer)
 +    {
 +        this.writer = writer;
 +    }
 +
 +    private ByteBuffer objToBB(Object o)
 +    {
 +        if (o == null)
 +            return (ByteBuffer)o;
 +        if (o instanceof java.lang.String)
-             return new ByteBuffer.wrap(DataByteArray((String)o).get());
++            return ByteBuffer.wrap(new DataByteArray((String)o).get());
 +        if (o instanceof Integer)
-             return IntegerType.instance.decompose((BigInteger)o);
++            return Int32Type.instance.decompose((Integer)o);
 +        if (o instanceof Long)
 +            return LongType.instance.decompose((Long)o);
 +        if (o instanceof Float)
 +            return FloatType.instance.decompose((Float)o);
 +        if (o instanceof Double)
 +            return DoubleType.instance.decompose((Double)o);
 +        if (o instanceof UUID)
 +            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
-         return null;
++        return ByteBuffer.wrap(((DataByteArray) o).get());
 +    }
 +
 +    public void putNext(Tuple t) throws ExecException, IOException
 +    {
 +        ByteBuffer key = objToBB(t.get(0));
 +        DefaultDataBag pairs = (DefaultDataBag) t.get(1);
 +        ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
 +        CfDef cfDef = getCfDef(storeSignature);
 +        try
 +        {
 +            for (Tuple pair : pairs)
 +            {
 +               Mutation mutation = new Mutation();
 +               if (DataType.findType(pair.get(1)) == DataType.BAG) // 
supercolumn
 +               {
 +                   org.apache.cassandra.thrift.SuperColumn sc = new 
org.apache.cassandra.thrift.SuperColumn();
 +                   sc.name = objToBB(pair.get(0));
 +                   ArrayList<org.apache.cassandra.thrift.Column> columns = 
new ArrayList<org.apache.cassandra.thrift.Column>();
 +                   for (Tuple subcol : (DefaultDataBag) pair.get(1))
 +                   {
 +                       org.apache.cassandra.thrift.Column column = new 
org.apache.cassandra.thrift.Column();
 +                       column.name = objToBB(subcol.get(0));
 +                       column.value = objToBB(subcol.get(1));
 +                       column.setTimestamp(System.currentTimeMillis() * 1000);
 +                       columns.add(column);
 +                   }
 +                   if (columns.isEmpty()) // a deletion
 +                   {
 +                       mutation.deletion = new Deletion();
 +                       mutation.deletion.super_column = objToBB(pair.get(0));
 +                       
mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000);
 +                   }
 +                   else
 +                   {
 +                       sc.columns = columns;
 +                       mutation.column_or_supercolumn = new 
ColumnOrSuperColumn();
 +                       mutation.column_or_supercolumn.super_column = sc;
 +                   }
 +               }
 +               else // assume column since it couldn't be anything else
 +               {
 +                   if (pair.get(1) == null)
 +                   {
 +                       mutation.deletion = new Deletion();
 +                       mutation.deletion.predicate = new 
org.apache.cassandra.thrift.SlicePredicate();
 +                       mutation.deletion.predicate.column_names = 
Arrays.asList(objToBB(pair.get(0)));
 +                       
mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000);
 +                   }
 +                   else
 +                   {
 +                       org.apache.cassandra.thrift.Column column = new 
org.apache.cassandra.thrift.Column();
 +                       column.name = objToBB(pair.get(0));
 +                       column.value = objToBB(pair.get(1));
 +                       column.setTimestamp(System.currentTimeMillis() * 1000);
 +                       mutation.column_or_supercolumn = new 
ColumnOrSuperColumn();
 +                       mutation.column_or_supercolumn.column = column;
 +                   }
 +               }
 +               mutationList.add(mutation);
 +            }
 +        }
 +        catch (ClassCastException e)
 +        {
 +            throw new IOException(e + " Output must be (key, 
{(column,value)...}) for ColumnFamily or (key, 
{supercolumn:{(column,value)...}...}) for SuperColumnFamily", e);
 +        }
 +        try
 +        {
 +            writer.write(key, mutationList);
 +        }
 +        catch (InterruptedException e)
 +        {
 +           throw new IOException(e);
 +        }
 +    }
 +
 +    public void cleanupOnFailure(String failure, Job job)
 +    {
 +    }
 +
 +    /* Methods to get the column family schema from Cassandra */
 +
 +    private void initSchema(String signature)
 +    {
 +        UDFContext context = UDFContext.getUDFContext();
 +        Properties property = 
context.getUDFProperties(CassandraStorage.class);
 +
 +        // Only get the schema if we haven't already gotten it
 +        if (!property.containsKey(signature))
 +        {
 +            Cassandra.Client client = null;
 +            try
 +            {
 +                client = ConfigHelper.getClientFromInputAddressList(conf);
 +                CfDef cfDef = null;
 +                client.set_keyspace(keyspace);
 +                KsDef ksDef = client.describe_keyspace(keyspace);
 +                List<CfDef> defs = ksDef.getCf_defs();
 +                for (CfDef def : defs)
 +                {
 +                    if (column_family.equalsIgnoreCase(def.getName()))
 +                    {
 +                        cfDef = def;
 +                        break;
 +                    }
 +                }
 +                if (cfDef != null)
 +                    property.setProperty(signature, cfdefToString(cfDef));
 +                else
 +                    throw new RuntimeException("Column family '" + 
column_family + "' not found in keyspace '" + keyspace + "'");
 +            }
 +            catch (TException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            catch (InvalidRequestException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            catch (NotFoundException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +    }
 +
 +    private static String cfdefToString(CfDef cfDef)
 +    {
 +        assert cfDef != null;
 +        // this is so awful it's kind of cool!
 +        TSerializer serializer = new TSerializer(new 
TBinaryProtocol.Factory());
 +        try
 +        {
 +            return Hex.bytesToHex(serializer.serialize(cfDef));
 +        }
 +        catch (TException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private static CfDef cfdefFromString(String st)
 +    {
 +        assert st != null;
 +        TDeserializer deserializer = new TDeserializer(new 
TBinaryProtocol.Factory());
 +        CfDef cfDef = new CfDef();
 +        try
 +        {
 +            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
 +        }
 +        catch (TException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +        return cfDef;
 +    }
 +}
 +

Reply via email to