Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 16b02887d -> 193665757 refs/heads/trunk 78c03052c -> 9718e13d7
Remove use of Cell in Thrift M/R classes Patch and review by Philip Thompson and Sam Tunnicliffe for CASSANDRA-8609 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/19366575 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/19366575 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/19366575 Branch: refs/heads/cassandra-2.2 Commit: 1936657570655129052cc48fa373c155086a6456 Parents: 16b0288 Author: Sam Tunnicliffe <[email protected]> Authored: Tue Jun 2 15:56:36 2015 +0100 Committer: Sam Tunnicliffe <[email protected]> Committed: Thu Jun 4 10:46:57 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + examples/hadoop_cql3_word_count/README.txt | 14 +- .../bin/word_count_counters | 1 + .../conf/log4j.properties | 32 - .../hadoop_cql3_word_count/conf/logback.xml | 42 + .../hadoop_cql3_word_count/src/WordCount.java | 4 +- examples/hadoop_word_count/README.txt | 10 +- .../hadoop_word_count/bin/word_count_counters | 1 + .../hadoop_word_count/conf/log4j.properties | 32 - examples/hadoop_word_count/conf/logback.xml | 42 + examples/hadoop_word_count/src/WordCount.java | 20 +- .../src/WordCountCounters.java | 24 +- .../hadoop/ColumnFamilyInputFormat.java | 7 +- .../hadoop/ColumnFamilyRecordReader.java | 117 +-- .../hadoop/pig/AbstractCassandraStorage.java | 796 ------------------- .../cassandra/hadoop/pig/CassandraStorage.java | 35 +- .../cassandra/hadoop/pig/CqlNativeStorage.java | 12 +- test/conf/logback-test.xml | 4 +- 18 files changed, 218 insertions(+), 976 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 933f5a6..882279f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2 + * Remove use of Cell in Thrift MapReduce classes (CASSANDRA-8609) * Integrate pre-release Java Driver 2.2-rc1, custom build (CASSANDRA-9493) * Clean up gossiper logic for old versions (CASSANDRA-9370) * Fix custom payload coding/decoding to match the spec (CASSANDRA-9515) http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/README.txt ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/README.txt b/examples/hadoop_cql3_word_count/README.txt index b69bdd5..b6ee33f 100644 --- a/examples/hadoop_cql3_word_count/README.txt +++ b/examples/hadoop_cql3_word_count/README.txt @@ -5,14 +5,16 @@ WordCount hadoop example: Inserts a bunch of words across multiple rows, and counts them, with RandomPartitioner. The word_count_counters example sums the value of counter columns for a key. -The scripts in bin/ assume you are running with cwd of contrib/word_count. +The scripts in bin/ assume you are running with cwd of examples/word_count. Running ======= -First build and start a Cassandra server with the default configuration*, -then run +First build and start a Cassandra server with the default configuration*. Ensure that the Thrift +interface is enabled, either by setting start_rpc:true in cassandra.yaml or by running +`nodetool enablethrift` after startup. +Once Cassandra has started and the Thrift interface is available, run contrib/word_count$ ant contrib/word_count$ bin/word_count_setup @@ -22,14 +24,14 @@ contrib/word_count$ bin/word_count_counters In order to view the results in Cassandra, one can use bin/cqlsh and perform the following operations: $ bin/cqlsh localhost -> use cql3_worldcount; +> use cql3_wordcount; > select * from output_words; The output of the word count can now be configured. In the bin/word_count file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem' and 'cassandra'. The filesystem option outputs to the /tmp/word_count* directories. The cassandra option outputs to the 'output_words' column family -in the 'cql3_worldcount' keyspace. 'cassandra' is the default. +in the 'cql3_wordcount' keyspace. 'cassandra' is the default. Read the code in src/ for more details. @@ -45,5 +47,5 @@ settings accordingly. Troubleshooting =============== -word_count uses conf/log4j.properties to log to wc.out. +word_count uses conf/logback.xml to log to wc.out. http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/bin/word_count_counters ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters index 0b69b40..52ea2e5 100755 --- a/examples/hadoop_cql3_word_count/bin/word_count_counters +++ b/examples/hadoop_cql3_word_count/bin/word_count_counters @@ -30,6 +30,7 @@ if [ ! -e $cwd/../build/word_count.jar ]; then exit 1 fi +CLASSPATH=$CLASSPATH:$cwd/../conf CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/conf/log4j.properties b/examples/hadoop_cql3_word_count/conf/log4j.properties deleted file mode 100644 index 508d60f..0000000 --- a/examples/hadoop_cql3_word_count/conf/log4j.properties +++ /dev/null @@ -1,32 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -log4j.rootLogger=INFO,stdout,F - -#stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n - -# log file -log4j.appender.F=org.apache.log4j.FileAppender -log4j.appender.F.Append=false -log4j.appender.F.layout=org.apache.log4j.PatternLayout -log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n -# Edit the next line to point to your logs directory -log4j.appender.F.File=wc.out - http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/conf/logback.xml ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/conf/logback.xml b/examples/hadoop_cql3_word_count/conf/logback.xml new file mode 100644 index 0000000..443bd1c --- /dev/null +++ b/examples/hadoop_cql3_word_count/conf/logback.xml @@ -0,0 +1,42 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<configuration scan="true"> + + <jmxConfigurator /> + + <appender name="FILE" class="ch.qos.logback.core.FileAppender"> + <file>wc.out</file> + <encoder> + <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + </appender> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="FILE" /> + <appender-ref ref="STDOUT" /> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java index 3702a2b..bc95736 100644 --- a/examples/hadoop_cql3_word_count/src/WordCount.java +++ b/examples/hadoop_cql3_word_count/src/WordCount.java @@ -44,7 +44,7 @@ import com.datastax.driver.core.Row; /** * This counts the occurrences of words in ColumnFamily - * cql3_worldcount ( id uuid, + * cql3_wordcount ( id uuid, * line text, * PRIMARY KEY (id)) * @@ -60,7 +60,7 @@ public class WordCount extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(WordCount.class); static final String INPUT_MAPPER_VAR = "input_mapper"; - static final String KEYSPACE = "cql3_worldcount"; + static final String KEYSPACE = "cql3_wordcount"; static final String COLUMN_FAMILY = "inputs"; static final String OUTPUT_REDUCER_VAR = "output_reducer"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/README.txt ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/README.txt b/examples/hadoop_word_count/README.txt index ec6f512..e336b89 100644 --- a/examples/hadoop_word_count/README.txt +++ b/examples/hadoop_word_count/README.txt @@ -5,14 +5,16 @@ WordCount hadoop example: Inserts a bunch of words across multiple rows, and counts them, with RandomPartitioner. The word_count_counters example sums the value of counter columns for a key. -The scripts in bin/ assume you are running with cwd of contrib/word_count. +The scripts in bin/ assume you are running with cwd of examples/word_count. Running ======= -First build and start a Cassandra server with the default configuration*, -then run +First build and start a Cassandra server with the default configuration*. Ensure that the Thrift +interface is enabled, either by setting start_rpc:true in cassandra.yaml or by running +`nodetool enablethrift` after startup. +Once Cassandra has started and the Thrift interface is available, run contrib/word_count$ ant contrib/word_count$ bin/word_count_setup @@ -45,4 +47,4 @@ settings accordingly. Troubleshooting =============== -word_count uses conf/log4j.properties to log to wc.out. +word_count uses conf/logback.xml to log to wc.out. http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/bin/word_count_counters ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/bin/word_count_counters b/examples/hadoop_word_count/bin/word_count_counters index 7793477..58c398c 100755 --- a/examples/hadoop_word_count/bin/word_count_counters +++ b/examples/hadoop_word_count/bin/word_count_counters @@ -30,6 +30,7 @@ if [ ! -e $cwd/../build/word_count.jar ]; then exit 1 fi +CLASSPATH=$CLASSPATH:$cwd/../conf CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/conf/log4j.properties b/examples/hadoop_word_count/conf/log4j.properties deleted file mode 100644 index 508d60f..0000000 --- a/examples/hadoop_word_count/conf/log4j.properties +++ /dev/null @@ -1,32 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -log4j.rootLogger=INFO,stdout,F - -#stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n - -# log file -log4j.appender.F=org.apache.log4j.FileAppender -log4j.appender.F.Append=false -log4j.appender.F.layout=org.apache.log4j.PatternLayout -log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n -# Edit the next line to point to your logs directory -log4j.appender.F.File=wc.out - http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/conf/logback.xml ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/conf/logback.xml b/examples/hadoop_word_count/conf/logback.xml new file mode 100644 index 0000000..443bd1c --- /dev/null +++ b/examples/hadoop_word_count/conf/logback.xml @@ -0,0 +1,42 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<configuration scan="true"> + + <jmxConfigurator /> + + <appender name="FILE" class="ch.qos.logback.core.FileAppender"> + <file>wc.out</file> + <encoder> + <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + </appender> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="FILE" /> + <appender-ref ref="STDOUT" /> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/src/WordCount.java b/examples/hadoop_word_count/src/WordCount.java index f6bca77..d092f1f 100644 --- a/examples/hadoop_word_count/src/WordCount.java +++ b/examples/hadoop_word_count/src/WordCount.java @@ -20,15 +20,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; -import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.hadoop.*; +import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -71,7 +67,7 @@ public class WordCount extends Configured implements Tool System.exit(0); } - public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, IntWritable> + public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @@ -82,17 +78,17 @@ public class WordCount extends Configured implements Tool { } - public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) throws IOException, InterruptedException + public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException { - for (Cell cell : columns.values()) + for (ColumnFamilyRecordReader.Column column : columns.values()) { - String name = ByteBufferUtil.string(cell.name().toByteBuffer()); + String name = ByteBufferUtil.string(column.name); String value = null; if (name.contains("int")) - value = String.valueOf(ByteBufferUtil.toInt(cell.value())); + value = String.valueOf(ByteBufferUtil.toInt(column.value)); else - value = ByteBufferUtil.string(cell.value()); + value = ByteBufferUtil.string(column.value); logger.debug("read {}:{}={} from {}", new Object[] {ByteBufferUtil.string(key), name, value, context.getInputSplit()}); http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/src/WordCountCounters.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/src/WordCountCounters.java b/examples/hadoop_word_count/src/WordCountCounters.java index 39fb778..98c8579 100644 --- a/examples/hadoop_word_count/src/WordCountCounters.java +++ b/examples/hadoop_word_count/src/WordCountCounters.java @@ -20,26 +20,26 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.SortedMap; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.thrift.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; +import org.apache.cassandra.hadoop.ColumnFamilyRecordReader; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.utils.ByteBufferUtil; - /** * This sums the word count stored in the input_words_count ColumnFamily for the key "key-if-verse1". * @@ -60,15 +60,15 @@ public class WordCountCounters extends Configured implements Tool System.exit(0); } - public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, LongWritable> + public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, LongWritable> { - public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) throws IOException, InterruptedException + public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException { long sum = 0; - for (Cell cell : columns.values()) + for (ColumnFamilyRecordReader.Column column : columns.values()) { - logger.debug("read " + key + ":" + cell.name() + " from " + context.getInputSplit()); - sum += ByteBufferUtil.toLong(cell.value()); + logger.debug("read " + key + ":" + ByteBufferUtil.string(column.name) + " from " + context.getInputSplit()); + sum += ByteBufferUtil.toLong(column.value); } context.write(new Text(ByteBufferUtil.string(key)), new LongWritable(sum)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index f89825f..4662fa5 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -25,7 +25,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.PasswordAuthenticator; -import org.apache.cassandra.db.Cell; import org.apache.cassandra.thrift.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; @@ -57,7 +56,7 @@ import org.apache.thrift.transport.TTransportException; * The default split size is 64k rows. */ @Deprecated -public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>> +public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class); @@ -91,12 +90,12 @@ public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Byt return client; } - public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException + public RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new ColumnFamilyRecordReader(); } - public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException + public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException { TaskAttemptContext tac = HadoopCompat.newMapContext( jobConf, http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index c103d75..aee730d 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -29,9 +29,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.BufferCell; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.TypeParser; @@ -49,8 +46,8 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; @Deprecated -public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> - implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> +public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> + implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class); @@ -58,7 +55,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private ColumnFamilySplit split; private RowIterator iter; - private Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> currentRow; + private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow; private SlicePredicate predicate; private boolean isEmptyPredicate; private int totalRowCount; // total number of rows to fetch @@ -98,7 +95,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return currentRow.left; } - public SortedMap<ByteBuffer, Cell> getCurrentValue() + public SortedMap<ByteBuffer, Column> getCurrentValue() { return currentRow.right; } @@ -216,7 +213,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return split.getLocations()[0]; } - private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> + private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> { protected List<KeySlice> rows; protected int totalRead = 0; @@ -283,50 +280,48 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return totalRead; } - protected List<Cell> unthriftify(ColumnOrSuperColumn cosc) + protected List<Pair<ByteBuffer, Column>> unthriftify(ColumnOrSuperColumn cosc) { if (cosc.counter_column != null) - return Collections.<Cell>singletonList(unthriftifyCounter(cosc.counter_column)); + return Collections.singletonList(unthriftifyCounter(cosc.counter_column)); if (cosc.counter_super_column != null) return unthriftifySuperCounter(cosc.counter_super_column); if (cosc.super_column != null) return unthriftifySuper(cosc.super_column); assert cosc.column != null; - return Collections.<Cell>singletonList(unthriftifySimple(cosc.column)); + return Collections.singletonList(unthriftifySimple(cosc.column)); } - private List<Cell> unthriftifySuper(SuperColumn super_column) + private List<Pair<ByteBuffer, Column>> unthriftifySuper(SuperColumn super_column) { - List<Cell> cells = new ArrayList<Cell>(super_column.columns.size()); + List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size()); for (org.apache.cassandra.thrift.Column column : super_column.columns) { - Cell c = unthriftifySimple(column); - cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer())))); + Pair<ByteBuffer, Column> c = unthriftifySimple(column); + columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right)); } - return cells; + return columns; } - protected Cell unthriftifySimple(org.apache.cassandra.thrift.Column column) + protected Pair<ByteBuffer, Column> unthriftifySimple(org.apache.cassandra.thrift.Column column) { - return new BufferCell(CellNames.simpleDense(column.name), column.value, column.timestamp); + return Pair.create(column.name, Column.fromRegularColumn(column)); } - private Cell unthriftifyCounter(CounterColumn column) + private Pair<ByteBuffer, Column> unthriftifyCounter(CounterColumn column) { - //CounterColumns read the counterID from the System keyspace, so need the StorageService running and access - //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Cell. - return new BufferCell(CellNames.simpleDense(column.name), ByteBufferUtil.bytes(column.value), 0); + return Pair.create(column.name, Column.fromCounterColumn(column)); } - private List<Cell> unthriftifySuperCounter(CounterSuperColumn super_column) + private List<Pair<ByteBuffer, Column>> unthriftifySuperCounter(CounterSuperColumn super_column) { - List<Cell> cells = new ArrayList<Cell>(super_column.columns.size()); + List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size()); for (CounterColumn column : super_column.columns) { - Cell c = unthriftifyCounter(column); - cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer())))); + Pair<ByteBuffer, Column> c = unthriftifyCounter(column); + columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right)); } - return cells; + return columns; } } @@ -405,7 +400,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } } - protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext() + protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext() { maybeInit(); if (rows == null) @@ -414,12 +409,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap totalRead++; KeySlice ks = rows.get(i++); AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator; - SortedMap<ByteBuffer, Cell> map = new TreeMap<ByteBuffer, Cell>(comp); + SortedMap<ByteBuffer, Column> map = new TreeMap<>(comp); for (ColumnOrSuperColumn cosc : ks.columns) { - List<Cell> cells = unthriftify(cosc); - for (Cell cell : cells) - map.put(cell.name().toByteBuffer(), cell); + List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc); + for (Pair<ByteBuffer, Column> column : columns) + map.put(column.left, column.right); } return Pair.create(ks.key, map); } @@ -427,7 +422,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private class WideRowIterator extends RowIterator { - private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> wideColumns; + private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns; private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER; @@ -476,13 +471,13 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } } - protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext() + protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext() { maybeInit(); if (rows == null) return endOfData(); - Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next = wideColumns.next(); + Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next(); lastColumn = next.right.keySet().iterator().next().duplicate(); maybeIncreaseRowCounter(next); @@ -494,7 +489,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap * Increases the row counter only if we really moved to the next row. * @param next just fetched row slice */ - private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next) + private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next) { ByteBuffer currentKey = next.left; if (!currentKey.equals(lastCountedKey)) @@ -504,7 +499,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } } - private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> + private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> { private final Iterator<KeySlice> rows; private Iterator<ColumnOrSuperColumn> columns; @@ -525,7 +520,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap columns = currentRow.columns.iterator(); } - protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext() + protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext() { AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator; while (true) @@ -533,20 +528,20 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap if (columns.hasNext()) { ColumnOrSuperColumn cosc = columns.next(); - SortedMap<ByteBuffer, Cell> map; - List<Cell> cells = unthriftify(cosc); - if (cells.size() == 1) + SortedMap<ByteBuffer, Column> map; + List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc); + if (columns.size() == 1) { - map = ImmutableSortedMap.of(cells.get(0).name().toByteBuffer(), cells.get(0)); + map = ImmutableSortedMap.of(columns.get(0).left, columns.get(0).right); } else { assert isSuper; - map = new TreeMap<ByteBuffer, Cell>(comp); - for (Cell cell : cells) - map.put(cell.name().toByteBuffer(), cell); + map = new TreeMap<>(comp); + for (Pair<ByteBuffer, Column> column : columns) + map.put(column.left, column.right); } - return Pair.<ByteBuffer, SortedMap<ByteBuffer, Cell>>create(currentRow.key, map); + return Pair.create(currentRow.key, map); } if (!rows.hasNext()) @@ -563,7 +558,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap // to the old. Thus, expect a small performance hit. // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat // and ColumnFamilyRecordReader don't support them, it should be fine for now. - public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Cell> value) throws IOException + public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException { if (this.nextKeyValue()) { @@ -584,13 +579,37 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return ByteBuffer.wrap(new byte[this.keyBufferSize]); } - public SortedMap<ByteBuffer, Cell> createValue() + public SortedMap<ByteBuffer, Column> createValue() { - return new TreeMap<ByteBuffer, Cell>(); + return new TreeMap<>(); } public long getPos() throws IOException { return iter.rowsRead(); } + + public static final class Column + { + public final ByteBuffer name; + public final ByteBuffer value; + public final long timestamp; + + private Column(ByteBuffer name, ByteBuffer value, long timestamp) + { + this.name = name; + this.value = value; + this.timestamp = timestamp; + } + + static Column fromRegularColumn(org.apache.cassandra.thrift.Column input) + { + return new Column(input.name, input.value, input.timestamp); + } + + static Column fromCounterColumn(org.apache.cassandra.thrift.CounterColumn input) + { + return new Column(input.name, ByteBufferUtil.bytes(input.value), 0); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java deleted file mode 100644 index 263e6c0..0000000 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop.pig; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.math.BigInteger; -import java.net.URLDecoder; -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.util.*; - -import org.apache.cassandra.transport.Server; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.auth.PasswordAuthenticator; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.schema.LegacySchemaTables; -import org.apache.cassandra.serializers.CollectionSerializer; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.*; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.pig.*; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.*; -import org.apache.pig.impl.util.UDFContext; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TBinaryProtocol; - -/** - * A LoadStoreFunc for retrieving data from and storing data to Cassandra - */ -public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata -{ - - protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR }; - - // system environment variables that can be set to configure connection info: - // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper - public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT"; - public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS"; - public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER"; - public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT"; - public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS"; - public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER"; - public final static String PIG_RPC_PORT = "PIG_RPC_PORT"; - public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; - public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; - public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT"; - public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT"; - public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE"; - - protected String DEFAULT_INPUT_FORMAT; - protected String DEFAULT_OUTPUT_FORMAT; - - public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter"; - - private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraStorage.class); - - protected String username; - protected String password; - protected String keyspace; - protected String column_family; - protected String loadSignature; - protected String storeSignature; - - protected Configuration conf; - protected String inputFormatClass; - protected String outputFormatClass; - protected int splitSize = 64 * 1024; - protected String partitionerClass; - protected boolean usePartitionFilter = false; - protected String initHostAddress; - protected String rpcPort; - protected int nativeProtocolVersion = 1; - - - public AbstractCassandraStorage() - { - super(); - } - - /** Deconstructs a composite type to a Tuple. */ - protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException - { - List<CompositeComponent> result = comparator.deconstruct(name); - Tuple t = TupleFactory.getInstance().newTuple(result.size()); - for (int i=0; i<result.size(); i++) - setTupleValue(t, i, cassandraToObj(result.get(i).comparator, result.get(i).value)); - - return t; - } - - /** convert a column to a tuple */ - protected Tuple columnToTuple(Cell col, CfInfo cfInfo, AbstractType comparator) throws IOException - { - CfDef cfDef = cfInfo.cfDef; - Tuple pair = TupleFactory.getInstance().newTuple(2); - - ByteBuffer colName = col.name().toByteBuffer(); - - // name - if(comparator instanceof AbstractCompositeType) - setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,colName)); - else - setTupleValue(pair, 0, cassandraToObj(comparator, colName)); - - // value - Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - if (cfInfo.cql3Table && !cfInfo.compactCqlTable) - { - ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(colName); - colName = names[names.length-1]; - } - if (validators.get(colName) == null) - { - Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value())); - } - else - setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value())); - return pair; - } - - /** set the value to the position of the tuple */ - protected void setTupleValue(Tuple pair, int position, Object value) throws ExecException - { - if (value instanceof BigInteger) - pair.set(position, ((BigInteger) value).intValue()); - else if (value instanceof ByteBuffer) - pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value))); - else if (value instanceof UUID) - pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value))); - else if (value instanceof Date) - pair.set(position, TimestampType.instance.decompose((Date) value).getLong()); - else - pair.set(position, value); - } - - /** get the columnfamily definition for the signature */ - protected CfInfo getCfInfo(String signature) throws IOException - { - UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(AbstractCassandraStorage.class); - String prop = property.getProperty(signature); - CfInfo cfInfo = new CfInfo(); - cfInfo.cfDef = cfdefFromString(prop.substring(2)); - cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false; - cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false; - return cfInfo; - } - - /** construct a map to store the mashaller type to cassandra data type mapping */ - protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException - { - Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class); - AbstractType comparator; - AbstractType subcomparator; - AbstractType default_validator; - AbstractType key_validator; - - comparator = parseType(cfDef.getComparator_type()); - subcomparator = parseType(cfDef.getSubcomparator_type()); - default_validator = parseType(cfDef.getDefault_validation_class()); - key_validator = parseType(cfDef.getKey_validation_class()); - - marshallers.put(MarshallerType.COMPARATOR, comparator); - marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator); - marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator); - marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator); - return marshallers; - } - - /** get the validators */ - protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException - { - Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>(); - for (ColumnDef cd : cfDef.getColumn_metadata()) - { - if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty()) - { - AbstractType validator = null; - try - { - validator = TypeParser.parse(cd.getValidation_class()); - if (validator instanceof CounterColumnType) - validator = LongType.instance; - validators.put(cd.name, validator); - } - catch (ConfigurationException | SyntaxException e) - { - throw new IOException(e); - } - } - } - return validators; - } - - /** parse the string to a cassandra data type */ - protected AbstractType parseType(String type) throws IOException - { - try - { - // always treat counters like longs, specifically CCT.compose is not what we need - if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType")) - return LongType.instance; - return TypeParser.parse(type); - } - catch (ConfigurationException e) - { - throw new IOException(e); - } - catch (SyntaxException e) - { - throw new IOException(e); - } - } - - @Override - public InputFormat getInputFormat() throws IOException - { - try - { - return FBUtilities.construct(inputFormatClass, "inputformat"); - } - catch (ConfigurationException e) - { - throw new IOException(e); - } - } - - /** decompose the query to store the parameters in a map */ - public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException - { - String[] params = query.split("&"); - Map<String, String> map = new HashMap<String, String>(params.length); - for (String param : params) - { - String[] keyValue = param.split("="); - map.put(keyValue[0], URLDecoder.decode(keyValue[1],"UTF-8")); - } - return map; - } - - /** set hadoop cassandra connection settings */ - protected void setConnectionInformation() throws IOException - { - if (System.getenv(PIG_RPC_PORT) != null) - { - ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT)); - ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT)); - } - - if (System.getenv(PIG_INPUT_RPC_PORT) != null) - ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT)); - if (System.getenv(PIG_OUTPUT_RPC_PORT) != null) - ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT)); - - if (System.getenv(PIG_INITIAL_ADDRESS) != null) - { - ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); - ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); - } - if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null) - ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS)); - if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null) - ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS)); - - if (System.getenv(PIG_PARTITIONER) != null) - { - ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER)); - ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER)); - } - if(System.getenv(PIG_INPUT_PARTITIONER) != null) - ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER)); - if(System.getenv(PIG_OUTPUT_PARTITIONER) != null) - ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER)); - if (System.getenv(PIG_INPUT_FORMAT) != null) - inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT)); - else - inputFormatClass = DEFAULT_INPUT_FORMAT; - if (System.getenv(PIG_OUTPUT_FORMAT) != null) - outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT)); - else - outputFormatClass = DEFAULT_OUTPUT_FORMAT; - } - - /** get the full class name */ - protected String getFullyQualifiedClassName(String classname) - { - return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname; - } - - /** get pig type for the cassandra data type*/ - protected byte getPigType(AbstractType type) - { - if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad - return DataType.LONG; - else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger - return DataType.INTEGER; - else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType) - return DataType.CHARARRAY; - else if (type instanceof FloatType) - return DataType.FLOAT; - else if (type instanceof DoubleType) - return DataType.DOUBLE; - else if (type instanceof AbstractCompositeType || type instanceof CollectionType) - return DataType.TUPLE; - - return DataType.BYTEARRAY; - } - - public ResourceStatistics getStatistics(String location, Job job) - { - return null; - } - - @Override - public String relativeToAbsolutePath(String location, Path curDir) throws IOException - { - return location; - } - - @Override - public void setUDFContextSignature(String signature) - { - this.loadSignature = signature; - } - - /** StoreFunc methods */ - public void setStoreFuncUDFContextSignature(String signature) - { - this.storeSignature = signature; - } - - public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException - { - return relativeToAbsolutePath(location, curDir); - } - - /** output format */ - public OutputFormat getOutputFormat() throws IOException - { - try - { - return FBUtilities.construct(outputFormatClass, "outputformat"); - } - catch (ConfigurationException e) - { - throw new IOException(e); - } - } - - public void checkSchema(ResourceSchema schema) throws IOException - { - // we don't care about types, they all get casted to ByteBuffers - } - - protected abstract ByteBuffer nullToBB(); - - /** convert object to ByteBuffer */ - protected ByteBuffer objToBB(Object o) - { - if (o == null) - return nullToBB(); - if (o instanceof java.lang.String) - return ByteBuffer.wrap(new DataByteArray((String)o).get()); - if (o instanceof Integer) - return Int32Type.instance.decompose((Integer)o); - if (o instanceof Long) - return LongType.instance.decompose((Long)o); - if (o instanceof Float) - return FloatType.instance.decompose((Float)o); - if (o instanceof Double) - return DoubleType.instance.decompose((Double)o); - if (o instanceof UUID) - return ByteBuffer.wrap(UUIDGen.decompose((UUID) o)); - if(o instanceof Tuple) { - List<Object> objects = ((Tuple)o).getAll(); - //collections - if (objects.size() > 0 && objects.get(0) instanceof String) - { - String collectionType = (String) objects.get(0); - if ("set".equalsIgnoreCase(collectionType) || - "list".equalsIgnoreCase(collectionType)) - return objToListOrSetBB(objects.subList(1, objects.size())); - else if ("map".equalsIgnoreCase(collectionType)) - return objToMapBB(objects.subList(1, objects.size())); - - } - return objToCompositeBB(objects); - } - - return ByteBuffer.wrap(((DataByteArray) o).get()); - } - - private ByteBuffer objToListOrSetBB(List<Object> objects) - { - List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); - for(Object sub : objects) - { - ByteBuffer buffer = objToBB(sub); - serialized.add(buffer); - } - // NOTE: using protocol v1 serialization format for collections so as to not break - // compatibility. Not sure if that's the right thing. - return CollectionSerializer.pack(serialized, objects.size(), Server.VERSION_1); - } - - private ByteBuffer objToMapBB(List<Object> objects) - { - List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2); - for(Object sub : objects) - { - List<Object> keyValue = ((Tuple)sub).getAll(); - for (Object entry: keyValue) - { - ByteBuffer buffer = objToBB(entry); - serialized.add(buffer); - } - } - // NOTE: using protocol v1 serialization format for collections so as to not break - // compatibility. Not sure if that's the right thing. - return CollectionSerializer.pack(serialized, objects.size(), Server.VERSION_1); - } - - private ByteBuffer objToCompositeBB(List<Object> objects) - { - List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); - int totalLength = 0; - for(Object sub : objects) - { - ByteBuffer buffer = objToBB(sub); - serialized.add(buffer); - totalLength += 2 + buffer.remaining() + 1; - } - ByteBuffer out = ByteBuffer.allocate(totalLength); - for (ByteBuffer bb : serialized) - { - int length = bb.remaining(); - out.put((byte) ((length >> 8) & 0xFF)); - out.put((byte) (length & 0xFF)); - out.put(bb); - out.put((byte) 0); - } - out.flip(); - return out; - } - - public void cleanupOnFailure(String failure, Job job) - { - } - - public void cleanupOnSuccess(String location, Job job) throws IOException { - } - - - /** Methods to get the column family schema from Cassandra */ - protected void initSchema(String signature) throws IOException - { - Properties properties = UDFContext.getUDFContext().getUDFProperties(AbstractCassandraStorage.class); - - // Only get the schema if we haven't already gotten it - if (!properties.containsKey(signature)) - { - try - { - Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf); - client.set_keyspace(keyspace); - - if (username != null && password != null) - { - Map<String, String> credentials = new HashMap<String, String>(2); - credentials.put(PasswordAuthenticator.USERNAME_KEY, username); - credentials.put(PasswordAuthenticator.PASSWORD_KEY, password); - - try - { - client.login(new AuthenticationRequest(credentials)); - } - catch (AuthenticationException e) - { - logger.error("Authentication exception: invalid username and/or password"); - throw new IOException(e); - } - } - - // compose the CfDef for the columfamily - CfInfo cfInfo = getCfInfo(client); - - if (cfInfo.cfDef != null) - { - StringBuilder sb = new StringBuilder(); - sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef)); - properties.setProperty(signature, sb.toString()); - } - else - throw new IOException(String.format("Table '%s' not found in keyspace '%s'", - column_family, - keyspace)); - } - catch (Exception e) - { - throw new IOException(e); - } - } - } - - /** convert CfDef to string */ - protected static String cfdefToString(CfDef cfDef) throws IOException - { - assert cfDef != null; - // this is so awful it's kind of cool! - TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); - try - { - return Hex.bytesToHex(serializer.serialize(cfDef)); - } - catch (TException e) - { - throw new IOException(e); - } - } - - /** convert string back to CfDef */ - protected static CfDef cfdefFromString(String st) throws IOException - { - assert st != null; - TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); - CfDef cfDef = new CfDef(); - try - { - deserializer.deserialize(cfDef, Hex.hexToBytes(st)); - } - catch (TException e) - { - throw new IOException(e); - } - return cfDef; - } - - /** return the CfInfo for the column family */ - protected CfInfo getCfInfo(Cassandra.Client client) - throws InvalidRequestException, - UnavailableException, - TimedOutException, - SchemaDisagreementException, - TException, - NotFoundException, - org.apache.cassandra.exceptions.InvalidRequestException, - ConfigurationException, - IOException - { - // get CF meta data - String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator, key_aliases " + - "FROM %s.%s " + - "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", - SystemKeyspace.NAME, - LegacySchemaTables.COLUMNFAMILIES, - keyspace, - column_family); - - CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); - - if (result == null || result.rows == null || result.rows.isEmpty()) - return null; - - Iterator<CqlRow> iteraRow = result.rows.iterator(); - CfDef cfDef = new CfDef(); - cfDef.keyspace = keyspace; - cfDef.name = column_family; - boolean cql3Table = false; - if (iteraRow.hasNext()) - { - CqlRow cqlRow = iteraRow.next(); - - cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value); - cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value); - ByteBuffer subComparator = cqlRow.columns.get(2).value; - if (subComparator != null) - cfDef.subcomparator_type = ByteBufferUtil.string(subComparator); - cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value); - cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value); - String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value); - if (FBUtilities.fromJsonList(keyAliases).size() > 0) - cql3Table = true; - } - cfDef.column_metadata = getColumnMetadata(client); - CfInfo cfInfo = new CfInfo(); - cfInfo.cfDef = cfDef; - if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType)) - cfInfo.compactCqlTable = true; - if (cql3Table) - cfInfo.cql3Table = true;; - return cfInfo; - } - - /** get a list of columns */ - protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client) - throws InvalidRequestException, - UnavailableException, - TimedOutException, - SchemaDisagreementException, - TException, - CharacterCodingException, - org.apache.cassandra.exceptions.InvalidRequestException, - ConfigurationException, - NotFoundException; - - /** get column meta data */ - protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn) - throws InvalidRequestException, - UnavailableException, - TimedOutException, - SchemaDisagreementException, - TException, - CharacterCodingException, - org.apache.cassandra.exceptions.InvalidRequestException, - ConfigurationException, - NotFoundException - { - String query = String.format("SELECT column_name, validator, index_type, type " + - "FROM %s.%s " + - "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", - SystemKeyspace.NAME, - LegacySchemaTables.COLUMNS, - keyspace, - column_family); - - CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); - - List<CqlRow> rows = result.rows; - List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); - if (rows == null || rows.isEmpty()) - { - // if CassandraStorage, just return the empty list - if (cassandraStorage) - return columnDefs; - - // otherwise for CqlNativeStorage, check metadata for classic thrift tables - CFMetaData cfm = getCFMetaData(keyspace, column_family, client); - for (ColumnDefinition def : cfm.regularAndStaticColumns()) - { - ColumnDef cDef = new ColumnDef(); - String columnName = def.name.toString(); - String type = def.type.toString(); - logger.debug("name: {}, type: {} ", columnName, type); - cDef.name = ByteBufferUtil.bytes(columnName); - cDef.validation_class = type; - columnDefs.add(cDef); - } - // we may not need to include the value column for compact tables as we - // could have already processed it as schema_columnfamilies.value_alias - if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null) - { - ColumnDefinition def = cfm.compactValueColumn(); - if ("value".equals(def.name.toString())) - { - ColumnDef cDef = new ColumnDef(); - cDef.name = def.name.bytes; - cDef.validation_class = def.type.toString(); - columnDefs.add(cDef); - } - } - return columnDefs; - } - - Iterator<CqlRow> iterator = rows.iterator(); - while (iterator.hasNext()) - { - CqlRow row = iterator.next(); - ColumnDef cDef = new ColumnDef(); - String type = ByteBufferUtil.string(row.getColumns().get(3).value); - if (!type.equals("regular")) - continue; - cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value)); - cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value); - ByteBuffer indexType = row.getColumns().get(2).value; - if (indexType != null) - cDef.index_type = getIndexType(ByteBufferUtil.string(indexType)); - columnDefs.add(cDef); - } - return columnDefs; - } - - /** get index type from string */ - protected IndexType getIndexType(String type) - { - type = type.toLowerCase(); - if ("keys".equals(type)) - return IndexType.KEYS; - else if("custom".equals(type)) - return IndexType.CUSTOM; - else if("composites".equals(type)) - return IndexType.COMPOSITES; - else - return null; - } - - /** return partition keys */ - public String[] getPartitionKeys(String location, Job job) throws IOException - { - if (!usePartitionFilter) - return null; - List<ColumnDef> indexes = getIndexes(); - String[] partitionKeys = new String[indexes.size()]; - for (int i = 0; i < indexes.size(); i++) - { - partitionKeys[i] = new String(indexes.get(i).getName()); - } - return partitionKeys; - } - - /** get a list of columns with defined index*/ - protected List<ColumnDef> getIndexes() throws IOException - { - CfDef cfdef = getCfInfo(loadSignature).cfDef; - List<ColumnDef> indexes = new ArrayList<ColumnDef>(); - for (ColumnDef cdef : cfdef.column_metadata) - { - if (cdef.index_type != null) - indexes.add(cdef); - } - return indexes; - } - - /** get CFMetaData of a column family */ - protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client) - throws NotFoundException, - InvalidRequestException, - TException, - org.apache.cassandra.exceptions.InvalidRequestException, - ConfigurationException - { - KsDef ksDef = client.describe_keyspace(ks); - for (CfDef cfDef : ksDef.cf_defs) - { - if (cfDef.name.equalsIgnoreCase(cf)) - return ThriftConversion.fromThrift(cfDef); - } - return null; - } - - protected Object cassandraToObj(AbstractType validator, ByteBuffer value) - { - if (validator instanceof DecimalType || validator instanceof InetAddressType) - return validator.getString(value); - - if (validator instanceof CollectionType) - { - // For CollectionType, the compose() method assumes the v3 protocol format of collection, which - // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format - return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion); - } - - return validator.compose(value); - } - - protected static class CfInfo - { - boolean compactCqlTable = false; - boolean cql3Table = false; - CfDef cfDef; - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 1ad80b7..5d354a7 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -30,12 +30,12 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.PasswordAuthenticator; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.SyntaxException; +import org.apache.cassandra.hadoop.ColumnFamilyRecordReader; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.cassandra.schema.LegacySchemaTables; @@ -83,7 +83,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private boolean slice_reverse = false; private boolean allow_deletes = false; - private RecordReader<ByteBuffer, Map<ByteBuffer, Cell>> reader; + private RecordReader<ByteBuffer, Map<ByteBuffer, ColumnFamilyRecordReader.Column>> reader; private RecordWriter<ByteBuffer, List<Mutation>> writer; private boolean widerows = false; @@ -113,7 +113,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo // wide row hacks private ByteBuffer lastKey; - private Map<ByteBuffer, Cell> lastRow; + private Map<ByteBuffer, ColumnFamilyRecordReader.Column> lastRow; private boolean hasNext = true; public CassandraStorage() @@ -164,7 +164,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo key = reader.getCurrentKey(); tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); } - for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet()) + for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet()) { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } @@ -202,7 +202,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class())); else addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); - for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet()) + for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet()) { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } @@ -216,17 +216,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo else addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); } - SortedMap<ByteBuffer, Cell> row = (SortedMap<ByteBuffer, Cell>)reader.getCurrentValue(); + SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> row = + (SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>)reader.getCurrentValue(); if (lastRow != null) // prepend what was read last time { - for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet()) + for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet()) { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } lastKey = null; lastRow = null; } - for (Map.Entry<ByteBuffer, Cell> entry : row.entrySet()) + for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : row.entrySet()) { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } @@ -251,7 +252,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo CfDef cfDef = getCfDef(loadSignature); ByteBuffer key = reader.getCurrentKey(); - Map<ByteBuffer, Cell> cf = reader.getCurrentValue(); + Map<ByteBuffer, ColumnFamilyRecordReader.Column> cf = reader.getCurrentValue(); assert key != null && cf != null; // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest @@ -285,7 +286,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo added.put(cdef.name, true); } // now add all the other columns - for (Map.Entry<ByteBuffer, Cell> entry : cf.entrySet()) + for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : cf.entrySet()) { if (!added.containsKey(entry.getKey())) bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); @@ -1338,27 +1339,25 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo } /** convert a column to a tuple */ - protected Tuple columnToTuple(Cell col, CfDef cfDef, AbstractType comparator) throws IOException + protected Tuple columnToTuple(ColumnFamilyRecordReader.Column column, CfDef cfDef, AbstractType comparator) throws IOException { Tuple pair = TupleFactory.getInstance().newTuple(2); - ByteBuffer colName = col.name().toByteBuffer(); - // name if(comparator instanceof AbstractCompositeType) - StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, colName)); + StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, column.name)); else - StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, colName, nativeProtocolVersion)); + StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, column.name, nativeProtocolVersion)); // value Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - if (validators.get(colName) == null) + if (validators.get(column.name) == null) { Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value(), nativeProtocolVersion)); + StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), column.value, nativeProtocolVersion)); } else - StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(colName), col.value(), nativeProtocolVersion)); + StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(column.name), column.value, nativeProtocolVersion)); return pair; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java index 537f30c..dc3c174 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java @@ -34,9 +34,6 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.exceptions.NoHostAvailableException; -import org.apache.cassandra.db.BufferCell; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.AuthenticationException; import org.apache.cassandra.exceptions.ConfigurationException; @@ -159,9 +156,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName()); if (columnValue != null) { - Cell cell = new BufferCell(CellNames.simpleDense(ByteBufferUtil.bytes(cdef.getName())), columnValue); AbstractType<?> validator = getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName())); - setTupleValue(tuple, i, cqlColumnToObj(cell, tableMetadata), validator); + setTupleValue(tuple, i, cqlColumnToObj(ByteBufferUtil.bytes(cdef.getName()), columnValue, + tableMetadata), validator); } else tuple.set(i, null); @@ -176,12 +173,11 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo } /** convert a cql column to an object */ - private Object cqlColumnToObj(Cell col, TableInfo cfDef) throws IOException + private Object cqlColumnToObj(ByteBuffer name, ByteBuffer columnValue, TableInfo cfDef) throws IOException { // standard Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - ByteBuffer cellName = col.name().toByteBuffer(); - return StorageHelper.cassandraToObj(validators.get(cellName), col.value(), nativeProtocolVersion); + return StorageHelper.cassandraToObj(validators.get(name), columnValue, nativeProtocolVersion); } /** set the value to the position of the tuple */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/test/conf/logback-test.xml ---------------------------------------------------------------------- diff --git a/test/conf/logback-test.xml b/test/conf/logback-test.xml index 8d99aa2..6d75aaf 100644 --- a/test/conf/logback-test.xml +++ b/test/conf/logback-test.xml @@ -61,7 +61,9 @@ <level>WARN</level> </filter> </appender> - + + <logger name="org.apache.hadoop" level="WARN"/> + <root level="DEBUG"> <appender-ref ref="ASYNCFILE" /> <appender-ref ref="STDERR" />
