Remove deprecated legacy Hadoop code patch by Philip Thompson; reviewed by Aleksey Yeschenko for CASSANDRA-9353
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/446e2537 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/446e2537 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/446e2537 Branch: refs/heads/trunk Commit: 446e2537895c15b404a74107069a12f3fc404b15 Parents: d62cd1b Author: Philip Thompson <[email protected]> Authored: Mon Jun 8 22:41:28 2015 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Mon Jun 8 22:41:28 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 14 + examples/hadoop_word_count/README.txt | 50 - examples/hadoop_word_count/bin/word_count | 61 - .../hadoop_word_count/bin/word_count_counters | 59 - examples/hadoop_word_count/bin/word_count_setup | 61 - examples/hadoop_word_count/build.xml | 113 -- examples/hadoop_word_count/conf/logback.xml | 42 - examples/hadoop_word_count/ivy.xml | 24 - examples/hadoop_word_count/src/WordCount.java | 222 --- .../src/WordCountCounters.java | 104 -- .../hadoop_word_count/src/WordCountSetup.java | 239 --- .../hadoop/AbstractColumnFamilyInputFormat.java | 296 ---- .../cassandra/hadoop/BulkOutputFormat.java | 91 -- .../cassandra/hadoop/BulkRecordWriter.java | 296 ---- .../hadoop/ColumnFamilyInputFormat.java | 125 -- .../hadoop/ColumnFamilyOutputFormat.java | 183 --- .../hadoop/ColumnFamilyRecordReader.java | 615 -------- .../hadoop/ColumnFamilyRecordWriter.java | 341 ----- .../hadoop/cql3/CqlBulkRecordWriter.java | 12 +- .../cassandra/hadoop/cql3/CqlInputFormat.java | 277 +++- .../cassandra/hadoop/cql3/CqlOutputFormat.java | 3 + .../cassandra/hadoop/cql3/CqlRecordWriter.java | 5 +- .../cassandra/hadoop/pig/CassandraStorage.java | 1397 ------------------ .../org/apache/cassandra/pig/PigTestBase.java | 27 +- .../pig/ThriftColumnFamilyDataTypeTest.java | 70 +- .../cassandra/pig/ThriftColumnFamilyTest.java | 320 ++-- 27 files changed, 451 insertions(+), 4597 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 560bd2b..061bd5f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0: + * Remove deprecated legacy Hadoop code (CASSANDRA-9353) * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801) * Change gossip stabilization to use endpoit size (CASSANDRA-9401) * Change default garbage collector to G1 (CASSANDRA-7486) http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 9beb911..44637eb 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -13,6 +13,20 @@ restore snapshots created with the previous major version using the 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. +3.0 +=== + +Upgrading +--------- + - Pig's CassandraStorage has been removed. Use CqlNativeStorage instead. + - Hadoop BulkOutputFormat and BulkRecordWriter have been removed; use + CqlBulkOutputFormat and CqlBulkRecordWriter instead. + - Hadoop ColumnFamilyInputFormat and ColumnFamilyOutputFormat have been removed; + use CqlInputFormat and CqlOutputFormat instead. + - Hadoop ColumnFamilyRecordReader and ColumnFamilyRecordWriter have been removed; + use CqlRecordReader and CqlRecordWriter instead. + + 2.2 === http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/README.txt ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/README.txt b/examples/hadoop_word_count/README.txt deleted file mode 100644 index e336b89..0000000 --- a/examples/hadoop_word_count/README.txt +++ /dev/null @@ -1,50 +0,0 @@ -Introduction -============ - -WordCount hadoop example: Inserts a bunch of words across multiple rows, -and counts them, with RandomPartitioner. The word_count_counters example sums -the value of counter columns for a key. - -The scripts in bin/ assume you are running with cwd of examples/word_count. - - -Running -======= - -First build and start a Cassandra server with the default configuration*. Ensure that the Thrift -interface is enabled, either by setting start_rpc:true in cassandra.yaml or by running -`nodetool enablethrift` after startup. -Once Cassandra has started and the Thrift interface is available, run - -contrib/word_count$ ant -contrib/word_count$ bin/word_count_setup -contrib/word_count$ bin/word_count -contrib/word_count$ bin/word_count_counters - -In order to view the results in Cassandra, one can use bin/cqlsh and -perform the following operations: -$ bin/cqlsh localhost -> use wordcount; -> select * from output_words; - -The output of the word count can now be configured. In the bin/word_count -file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem' -and 'cassandra'. The filesystem option outputs to the /tmp/word_count* -directories. The cassandra option outputs to the 'output_words' column family -in the 'wordcount' keyspace. 'cassandra' is the default. - -Read the code in src/ for more details. - -The word_count_counters example sums the counter columns for a row. The output -is written to a text file in /tmp/word_count_counters. - -*It is recommended to turn off vnodes when running Cassandra with hadoop. -This is done by setting "num_tokens: 1" in cassandra.yaml. If you want to -point wordcount at a real cluster, modify the seed and listenaddress -settings accordingly. - - -Troubleshooting -=============== - -word_count uses conf/logback.xml to log to wc.out. http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/bin/word_count ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/bin/word_count b/examples/hadoop_word_count/bin/word_count deleted file mode 100755 index 34534d7..0000000 --- a/examples/hadoop_word_count/bin/word_count +++ /dev/null @@ -1,61 +0,0 @@ -#!/bin/sh - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cwd=`dirname $0` - -# Cassandra class files. -if [ ! -d $cwd/../../../build/classes/main ]; then - echo "Unable to locate cassandra class files" >&2 - exit 1 -fi - -# word_count Jar. -if [ ! -e $cwd/../build/word_count.jar ]; then - echo "Unable to locate word_count jar" >&2 - exit 1 -fi - -CLASSPATH=$CLASSPATH:$cwd/../conf -CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar -CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main -CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift -for jar in $cwd/../build/lib/jars/*.jar; do - CLASSPATH=$CLASSPATH:$jar -done -for jar in $cwd/../../../lib/*.jar; do - CLASSPATH=$CLASSPATH:$jar -done -for jar in $cwd/../../../build/lib/jars/*.jar; do - CLASSPATH=$CLASSPATH:$jar -done - -if [ -x $JAVA_HOME/bin/java ]; then - JAVA=$JAVA_HOME/bin/java -else - JAVA=`which java` -fi - -if [ "x$JAVA" = "x" ]; then - echo "Java executable not found (hint: set JAVA_HOME)" >&2 - exit 1 -fi - -OUTPUT_REDUCER=cassandra - -#echo $CLASSPATH -"$JAVA" -Xmx1G -ea -cp "$CLASSPATH" WordCount output_reducer=$OUTPUT_REDUCER http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/bin/word_count_counters ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/bin/word_count_counters b/examples/hadoop_word_count/bin/word_count_counters deleted file mode 100755 index 122565d..0000000 --- a/examples/hadoop_word_count/bin/word_count_counters +++ /dev/null @@ -1,59 +0,0 @@ -#!/bin/sh - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cwd=`dirname $0` - -# Cassandra class files. -if [ ! -d $cwd/../../../build/classes/main ]; then - echo "Unable to locate cassandra class files" >&2 - exit 1 -fi - -# word_count Jar. -if [ ! -e $cwd/../build/word_count.jar ]; then - echo "Unable to locate word_count jar" >&2 - exit 1 -fi - -CLASSPATH=$CLASSPATH:$cwd/../conf -CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar -CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main -CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift -for jar in $cwd/../build/lib/jars/*.jar; do - CLASSPATH=$CLASSPATH:$jar -done -for jar in $cwd/../../../lib/*.jar; do - CLASSPATH=$CLASSPATH:$jar -done -for jar in $cwd/../../../build/lib/jars/*.jar; do - CLASSPATH=$CLASSPATH:$jar -done - -if [ -x $JAVA_HOME/bin/java ]; then - JAVA=$JAVA_HOME/bin/java -else - JAVA=`which java` -fi - -if [ "x$JAVA" = "x" ]; then - echo "Java executable not found (hint: set JAVA_HOME)" >&2 - exit 1 -fi - -#echo $CLASSPATH -"$JAVA" -Xmx1G -ea -cp "$CLASSPATH" WordCountCounters http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/bin/word_count_setup ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/bin/word_count_setup b/examples/hadoop_word_count/bin/word_count_setup deleted file mode 100755 index 6e5650f..0000000 --- a/examples/hadoop_word_count/bin/word_count_setup +++ /dev/null @@ -1,61 +0,0 @@ -#!/bin/sh - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cwd=`dirname $0` - -# Cassandra class files. -if [ ! -d $cwd/../../../build/classes/main ]; then - echo "Unable to locate cassandra class files" >&2 - exit 1 -fi - -# word_count Jar. -if [ ! -e $cwd/../build/word_count.jar ]; then - echo "Unable to locate word_count jar" >&2 - exit 1 -fi - -CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar -CLASSPATH=$CLASSPATH:.:$cwd/../../../build/classes/main -CLASSPATH=$CLASSPATH:.:$cwd/../../../build/classes/thrift -for jar in $cwd/../build/lib/jars/*.jar; do - CLASSPATH=$CLASSPATH:$jar -done -for jar in $cwd/../../../lib/*.jar; do - CLASSPATH=$CLASSPATH:$jar -done -for jar in $cwd/../../../build/lib/jars/*.jar; do - CLASSPATH=$CLASSPATH:$jar -done - -if [ -x $JAVA_HOME/bin/java ]; then - JAVA=$JAVA_HOME/bin/java -else - JAVA=`which java` -fi - -if [ "x$JAVA" = "x" ]; then - echo "Java executable not found (hint: set JAVA_HOME)" >&2 - exit 1 -fi - -HOST=localhost -PORT=9160 -FRAMED=true - -"$JAVA" -Xmx1G -ea -Dcassandra.host=$HOST -Dcassandra.port=$PORT -Dcassandra.framed=$FRAMED -cp "$CLASSPATH" WordCountSetup http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/build.xml ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/build.xml b/examples/hadoop_word_count/build.xml deleted file mode 100644 index 939e1b3..0000000 --- a/examples/hadoop_word_count/build.xml +++ /dev/null @@ -1,113 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> -<project default="jar" name="word_count" xmlns:ivy="antlib:org.apache.ivy.ant"> - <property name="cassandra.dir" value="../.." /> - <property name="cassandra.dir.lib" value="${cassandra.dir}/lib" /> - <property name="cassandra.classes" value="${cassandra.dir}/build/classes" /> - <property name="build.src" value="${basedir}/src" /> - <property name="build.dir" value="${basedir}/build" /> - <property name="ivy.lib.dir" value="${build.dir}/lib" /> - <property name="build.classes" value="${build.dir}/classes" /> - <property name="final.name" value="word_count" /> - <property name="ivy.version" value="2.1.0" /> - <property name="ivy.url" - value="http://repo2.maven.org/maven2/org/apache/ivy/ivy" /> - - <condition property="ivy.jar.exists"> - <available file="${build.dir}/ivy-${ivy.version}.jar" /> - </condition> - - <path id="autoivy.classpath"> - <fileset dir="${ivy.lib.dir}"> - <include name="**/*.jar" /> - </fileset> - <pathelement location="${build.dir}/ivy-${ivy.version}.jar"/> - </path> - - <path id="wordcount.build.classpath"> - <fileset dir="${ivy.lib.dir}"> - <include name="**/*.jar" /> - </fileset> - <!-- cassandra dependencies --> - <fileset dir="${cassandra.dir.lib}"> - <include name="**/*.jar" /> - </fileset> - <fileset dir="${cassandra.dir}/build/lib/jars"> - <include name="**/*.jar" /> - </fileset> - <pathelement location="${cassandra.classes}/main" /> - <pathelement location="${cassandra.classes}/thrift" /> - </path> - - <target name="init"> - <mkdir dir="${build.classes}" /> - </target> - - <target depends="init,ivy-retrieve-build" name="build"> - <javac destdir="${build.classes}"> - <src path="${build.src}" /> - <classpath refid="wordcount.build.classpath" /> - </javac> - </target> - - <target name="jar" depends="build"> - <mkdir dir="${build.classes}/META-INF" /> - <jar jarfile="${build.dir}/${final.name}.jar"> - <fileset dir="${build.classes}" /> - <fileset dir="${cassandra.classes}/main" /> - <fileset dir="${cassandra.classes}/thrift" /> - <fileset dir="${cassandra.dir}"> - <include name="lib/**/*.jar" /> - </fileset> - <zipfileset dir="${cassandra.dir}/build/lib/jars/" prefix="lib"> - <include name="**/*.jar" /> - </zipfileset> - <fileset file="${basedir}/cassandra.yaml" /> - </jar> - </target> - - <target name="clean"> - <delete dir="${build.dir}" /> - </target> - - <!-- - Ivy Specific targets - to fetch Ivy and this project's dependencies - --> - <target name="ivy-download" unless="ivy.jar.exists"> - <echo>Downloading Ivy...</echo> - <mkdir dir="${build.dir}" /> - <get src="${ivy.url}/${ivy.version}/ivy-${ivy.version}.jar" - dest="${build.dir}/ivy-${ivy.version}.jar" usetimestamp="true" /> - </target> - - <target name="ivy-init" depends="ivy-download" unless="ivy.initialized"> - <mkdir dir="${ivy.lib.dir}"/> - <taskdef resource="org/apache/ivy/ant/antlib.xml" - uri="antlib:org.apache.ivy.ant" - classpathref="autoivy.classpath"/> - <property name="ivy.initialized" value="true"/> - </target> - - <target name="ivy-retrieve-build" depends="ivy-init"> - <ivy:retrieve type="jar,source" sync="true" - pattern="${ivy.lib.dir}/[type]s/[artifact]-[revision].[ext]" /> - </target> -</project> http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/conf/logback.xml ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/conf/logback.xml b/examples/hadoop_word_count/conf/logback.xml deleted file mode 100644 index 443bd1c..0000000 --- a/examples/hadoop_word_count/conf/logback.xml +++ /dev/null @@ -1,42 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> - -<configuration scan="true"> - - <jmxConfigurator /> - - <appender name="FILE" class="ch.qos.logback.core.FileAppender"> - <file>wc.out</file> - <encoder> - <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern> - </encoder> - </appender> - - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> - </encoder> - </appender> - - <root level="INFO"> - <appender-ref ref="FILE" /> - <appender-ref ref="STDOUT" /> - </root> - -</configuration> http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/ivy.xml ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/ivy.xml b/examples/hadoop_word_count/ivy.xml deleted file mode 100644 index 2016eb8..0000000 --- a/examples/hadoop_word_count/ivy.xml +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> -<ivy-module version="2.0"> - <info organisation="apache-cassandra" module="word-count"/> - <dependencies> - <dependency org="org.apache.hadoop" name="hadoop-core" rev="1.0.3"/> - </dependencies> -</ivy-module> http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/src/WordCount.java b/examples/hadoop_word_count/src/WordCount.java deleted file mode 100644 index d092f1f..0000000 --- a/examples/hadoop_word_count/src/WordCount.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.hadoop.*; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * This counts the occurrences of words in ColumnFamily Standard1, that has a single column (that we care about) - * "text" containing a sequence of words. - * - * For each word, we output the total number of occurrences across all texts. - * - * When outputting to Cassandra, we write the word counts as a {word, count} column/value pair, - * with a row key equal to the name of the source column we read the words from. - */ -public class WordCount extends Configured implements Tool -{ - private static final Logger logger = LoggerFactory.getLogger(WordCount.class); - - static final String KEYSPACE = "wordcount"; - static final String COLUMN_FAMILY = "input_words"; - - static final String OUTPUT_REDUCER_VAR = "output_reducer"; - static final String OUTPUT_COLUMN_FAMILY = "output_words"; - private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count"; - - private static final String CONF_COLUMN_NAME = "columnname"; - - public static void main(String[] args) throws Exception - { - // Let ToolRunner handle generic command-line options - ToolRunner.run(new Configuration(), new WordCount(), args); - System.exit(0); - } - - public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, IntWritable> - { - private final static IntWritable one = new IntWritable(1); - private Text word = new Text(); - private ByteBuffer sourceColumn; - - protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) - throws IOException, InterruptedException - { - } - - public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException - { - for (ColumnFamilyRecordReader.Column column : columns.values()) - { - String name = ByteBufferUtil.string(column.name); - String value = null; - - if (name.contains("int")) - value = String.valueOf(ByteBufferUtil.toInt(column.value)); - else - value = ByteBufferUtil.string(column.value); - - logger.debug("read {}:{}={} from {}", - new Object[] {ByteBufferUtil.string(key), name, value, context.getInputSplit()}); - - StringTokenizer itr = new StringTokenizer(value); - while (itr.hasMoreTokens()) - { - word.set(itr.nextToken()); - context.write(word, one); - } - } - } - } - - public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable> - { - public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException - { - int sum = 0; - for (IntWritable val : values) - sum += val.get(); - context.write(key, new IntWritable(sum)); - } - } - - public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> - { - private ByteBuffer outputKey; - - protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) - throws IOException, InterruptedException - { - outputKey = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); - } - - public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException - { - int sum = 0; - for (IntWritable val : values) - sum += val.get(); - context.write(outputKey, Collections.singletonList(getMutation(word, sum))); - } - - private static Mutation getMutation(Text word, int sum) - { - org.apache.cassandra.thrift.Column c = new org.apache.cassandra.thrift.Column(); - c.setName(Arrays.copyOf(word.getBytes(), word.getLength())); - c.setValue(ByteBufferUtil.bytes(sum)); - c.setTimestamp(System.currentTimeMillis()); - - Mutation m = new Mutation(); - m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); - m.column_or_supercolumn.setColumn(c); - return m; - } - } - - public int run(String[] args) throws Exception - { - String outputReducerType = "filesystem"; - if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR)) - { - String[] s = args[0].split("="); - if (s != null && s.length == 2) - outputReducerType = s[1]; - } - logger.info("output reducer type: " + outputReducerType); - - // use a smaller page size that doesn't divide the row count evenly to exercise the paging logic better - ConfigHelper.setRangeBatchSize(getConf(), 99); - - for (int i = 0; i < WordCountSetup.TEST_COUNT; i++) - { - String columnName = "text" + i; - - Job job = new Job(getConf(), "wordcount"); - job.setJarByClass(WordCount.class); - job.setMapperClass(TokenizerMapper.class); - - if (outputReducerType.equalsIgnoreCase("filesystem")) - { - job.setCombinerClass(ReducerToFilesystem.class); - job.setReducerClass(ReducerToFilesystem.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i)); - } - else - { - job.setReducerClass(ReducerToCassandra.class); - - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(IntWritable.class); - job.setOutputKeyClass(ByteBuffer.class); - job.setOutputValueClass(List.class); - - job.setOutputFormatClass(ColumnFamilyOutputFormat.class); - - ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); - job.getConfiguration().set(CONF_COLUMN_NAME, "sum"); - } - - job.setInputFormatClass(ColumnFamilyInputFormat.class); - - ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); - ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); - ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); - ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); - SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName))); - ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); - - if (i == 4) - { - IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("int4"), IndexOperator.EQ, ByteBufferUtil.bytes(0)); - ConfigHelper.setInputRange(job.getConfiguration(), Arrays.asList(expr)); - } - - if (i == 5) - { - // this will cause the predicate to be ignored in favor of scanning everything as a wide row - ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, true); - } - - ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost"); - ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); - - job.waitForCompletion(true); - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/src/WordCountCounters.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/src/WordCountCounters.java b/examples/hadoop_word_count/src/WordCountCounters.java deleted file mode 100644 index 98c8579..0000000 --- a/examples/hadoop_word_count/src/WordCountCounters.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.SortedMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; -import org.apache.cassandra.hadoop.ColumnFamilyRecordReader; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.SliceRange; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * This sums the word count stored in the input_words_count ColumnFamily for the key "key-if-verse1". - * - * Output is written to a text file. - */ -public class WordCountCounters extends Configured implements Tool -{ - private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class); - - static final String COUNTER_COLUMN_FAMILY = "input_words_count"; - private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count_counters"; - - - public static void main(String[] args) throws Exception - { - // Let ToolRunner handle generic command-line options - ToolRunner.run(new Configuration(), new WordCountCounters(), args); - System.exit(0); - } - - public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, LongWritable> - { - public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException - { - long sum = 0; - for (ColumnFamilyRecordReader.Column column : columns.values()) - { - logger.debug("read " + key + ":" + ByteBufferUtil.string(column.name) + " from " + context.getInputSplit()); - sum += ByteBufferUtil.toLong(column.value); - } - context.write(new Text(ByteBufferUtil.string(key)), new LongWritable(sum)); - } - } - - public int run(String[] args) throws Exception - { - Job job = new Job(getConf(), "wordcountcounters"); - job.setJarByClass(WordCountCounters.class); - job.setMapperClass(SumMapper.class); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX)); - - - job.setInputFormatClass(ColumnFamilyInputFormat.class); - - ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); - ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); - ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner"); - ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCountCounters.COUNTER_COLUMN_FAMILY); - SlicePredicate predicate = new SlicePredicate().setSlice_range( - new SliceRange(). - setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER). - setFinish(ByteBufferUtil.EMPTY_BYTE_BUFFER). - setCount(100)); - ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); - - job.waitForCompletion(true); - return 0; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/src/WordCountSetup.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/src/WordCountSetup.java b/examples/hadoop_word_count/src/WordCountSetup.java deleted file mode 100644 index 0ef5341..0000000 --- a/examples/hadoop_word_count/src/WordCountSetup.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.util.concurrent.Uninterruptibles; - -public class WordCountSetup -{ - private static final Logger logger = LoggerFactory.getLogger(WordCountSetup.class); - - public static final int TEST_COUNT = 6; - - public static void main(String[] args) throws Exception - { - Cassandra.Iface client = createConnection(); - - setupKeyspace(client); - - client.set_keyspace(WordCount.KEYSPACE); - - Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap; - Column c; - - // text0: no rows - - // text1: 1 row, 1 word - c = new Column() - .setName(ByteBufferUtil.bytes("text1")) - .setValue(ByteBufferUtil.bytes("word1")) - .setTimestamp(System.currentTimeMillis()); - mutationMap = getMutationMap(ByteBufferUtil.bytes("key0"), WordCount.COLUMN_FAMILY, c); - client.batch_mutate(mutationMap, ConsistencyLevel.ONE); - logger.info("added text1"); - - // text1: 1 row, 2 word - c = new Column() - .setName(ByteBufferUtil.bytes("text2")) - .setValue(ByteBufferUtil.bytes("word1 word2")) - .setTimestamp(System.currentTimeMillis()); - mutationMap = getMutationMap(ByteBufferUtil.bytes("key0"), WordCount.COLUMN_FAMILY, c); - client.batch_mutate(mutationMap, ConsistencyLevel.ONE); - logger.info("added text2"); - - // text3: 1000 rows, 1 word - mutationMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - for (int i = 0; i < 1000; i++) - { - c = new Column() - .setName(ByteBufferUtil.bytes("text3")) - .setValue(ByteBufferUtil.bytes("word1")) - .setTimestamp(System.currentTimeMillis()); - addToMutationMap(mutationMap, ByteBufferUtil.bytes("key" + i), WordCount.COLUMN_FAMILY, c); - } - client.batch_mutate(mutationMap, ConsistencyLevel.ONE); - logger.info("added text3"); - - // text4: 1000 rows, 1 word, one column to filter on - mutationMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - for (int i = 0; i < 1000; i++) - { - Column c1 = new Column() - .setName(ByteBufferUtil.bytes("text4")) - .setValue(ByteBufferUtil.bytes("word1")) - .setTimestamp(System.currentTimeMillis()); - Column c2 = new Column() - .setName(ByteBufferUtil.bytes("int4")) - .setValue(ByteBufferUtil.bytes(i % 4)) - .setTimestamp(System.currentTimeMillis()); - ByteBuffer key = ByteBufferUtil.bytes("key" + i); - addToMutationMap(mutationMap, key, WordCount.COLUMN_FAMILY, c1); - addToMutationMap(mutationMap, key, WordCount.COLUMN_FAMILY, c2); - } - client.batch_mutate(mutationMap, ConsistencyLevel.ONE); - logger.info("added text4"); - - // sentence data for the counters - final ByteBuffer key = ByteBufferUtil.bytes("key-if-verse1"); - final ColumnParent colParent = new ColumnParent(WordCountCounters.COUNTER_COLUMN_FAMILY); - for (String sentence : sentenceData()) - { - client.add(key, - colParent, - new CounterColumn(ByteBufferUtil.bytes(sentence), - (long) sentence.split("\\s").length), - ConsistencyLevel.ONE); - } - logger.info("added key-if-verse1"); - - System.exit(0); - } - - private static Map<ByteBuffer, Map<String, List<Mutation>>> getMutationMap(ByteBuffer key, String cf, Column c) - { - Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - addToMutationMap(mutationMap, key, cf, c); - return mutationMap; - } - - private static void addToMutationMap(Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap, ByteBuffer key, String cf, Column c) - { - Map<String, List<Mutation>> cfMutation = mutationMap.get(key); - if (cfMutation == null) - { - cfMutation = new HashMap<String, List<Mutation>>(); - mutationMap.put(key, cfMutation); - } - - List<Mutation> mutationList = cfMutation.get(cf); - if (mutationList == null) - { - mutationList = new ArrayList<Mutation>(); - cfMutation.put(cf, mutationList); - } - - ColumnOrSuperColumn cc = new ColumnOrSuperColumn(); - Mutation m = new Mutation(); - - cc.setColumn(c); - m.setColumn_or_supercolumn(cc); - mutationList.add(m); - } - - private static void setupKeyspace(Cassandra.Iface client) throws TException, InvalidRequestException, SchemaDisagreementException - { - List<CfDef> cfDefList = new ArrayList<CfDef>(); - CfDef input = new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY); - input.setComparator_type("AsciiType"); - input.setColumn_metadata(Arrays.asList(new ColumnDef(ByteBufferUtil.bytes("text1"), "AsciiType"), - new ColumnDef(ByteBufferUtil.bytes("text2"), "AsciiType"), - new ColumnDef(ByteBufferUtil.bytes("text3"), "AsciiType"), - new ColumnDef(ByteBufferUtil.bytes("text4"), "AsciiType"), - new ColumnDef(ByteBufferUtil.bytes("int4"), "Int32Type").setIndex_name("int4idx").setIndex_type(IndexType.KEYS))); - cfDefList.add(input); - - CfDef output = new CfDef(WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY); - output.setComparator_type("AsciiType"); - output.setDefault_validation_class("Int32Type"); - cfDefList.add(output); - - CfDef counterInput = new CfDef(WordCount.KEYSPACE, WordCountCounters.COUNTER_COLUMN_FAMILY); - counterInput.setComparator_type("UTF8Type"); - counterInput.setDefault_validation_class("CounterColumnType"); - cfDefList.add(counterInput); - - KsDef ksDef = new KsDef(WordCount.KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", cfDefList); - ksDef.putToStrategy_options("replication_factor", "1"); - client.system_add_keyspace(ksDef); - - int magnitude = getNumberOfHosts(client); - Uninterruptibles.sleepUninterruptibly(magnitude, TimeUnit.SECONDS); - } - - private static int getNumberOfHosts(Cassandra.Iface client) - throws InvalidRequestException, UnavailableException, TimedOutException, TException - { - client.set_keyspace("system"); - SlicePredicate predicate = new SlicePredicate(); - SliceRange sliceRange = new SliceRange(); - sliceRange.setStart(new byte[0]); - sliceRange.setFinish(new byte[0]); - predicate.setSlice_range(sliceRange); - - KeyRange keyrRange = new KeyRange(); - keyrRange.setStart_key(new byte[0]); - keyrRange.setEnd_key(new byte[0]); - //keyrRange.setCount(100); - - ColumnParent parent = new ColumnParent("peers"); - - List<KeySlice> ls = client.get_range_slices(parent, predicate, keyrRange, ConsistencyLevel.ONE); - - return ls.size(); - } - - private static Cassandra.Iface createConnection() throws TTransportException - { - if (System.getProperty("cassandra.host") == null || System.getProperty("cassandra.port") == null) - { - logger.warn("cassandra.host or cassandra.port is not defined, using default"); - } - return createConnection(System.getProperty("cassandra.host", "localhost"), - Integer.valueOf(System.getProperty("cassandra.port", "9160"))); - } - - private static Cassandra.Client createConnection(String host, Integer port) throws TTransportException - { - TSocket socket = new TSocket(host, port); - TTransport trans = new TFramedTransport(socket); - trans.open(); - TProtocol protocol = new TBinaryProtocol(trans); - - return new Cassandra.Client(protocol); - } - - private static String[] sentenceData() - { // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94 - return new String[]{ - "If you can keep your head when all about you", - "Are losing theirs and blaming it on you", - "If you can trust yourself when all men doubt you,", - "But make allowance for their doubting too:", - "If you can wait and not be tired by waiting,", - "Or being lied about, donât deal in lies,", - "Or being hated, donât give way to hating,", - "And yet donât look too good, nor talk too wise;" - }; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java deleted file mode 100644 index 4dd53ff..0000000 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datastax.driver.core.Host; -import com.datastax.driver.core.Metadata; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.TokenRange; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.dht.ByteOrderedPartitioner; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.OrderPreservingPartitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.hadoop.cql3.*; -import org.apache.cassandra.thrift.KeyRange; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.*; - -public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y> -{ - private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyInputFormat.class); - - public static final String MAPRED_TASK_ID = "mapred.task.id"; - // The simple fact that we need this is because the old Hadoop API wants us to "write" - // to the key and value whereas the new asks for it. - // I choose 8kb as the default max key size (instantiated only once), but you can - // override it in your jobConf with this setting. - public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size"; - public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192; - - private String keyspace; - private String cfName; - private IPartitioner partitioner; - private Session session; - - protected void validateConfiguration(Configuration conf) - { - if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null) - { - throw new UnsupportedOperationException("you must set the keyspace and table with setInputColumnFamily()"); - } - if (ConfigHelper.getInputInitialAddress(conf) == null) - throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress"); - if (ConfigHelper.getInputPartitioner(conf) == null) - throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner"); - } - - public List<InputSplit> getSplits(JobContext context) throws IOException - { - Configuration conf = HadoopCompat.getConfiguration(context); - - validateConfiguration(conf); - - keyspace = ConfigHelper.getInputKeyspace(conf); - cfName = ConfigHelper.getInputColumnFamily(conf); - partitioner = ConfigHelper.getInputPartitioner(conf); - logger.debug("partitioner is {}", partitioner); - - // canonical ranges and nodes holding replicas - Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace); - - // canonical ranges, split into pieces, fetching the splits in parallel - ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - List<InputSplit> splits = new ArrayList<>(); - - try - { - List<Future<List<InputSplit>>> splitfutures = new ArrayList<>(); - KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); - Range<Token> jobRange = null; - if (jobKeyRange != null) - { - if (jobKeyRange.start_key != null) - { - if (!partitioner.preservesOrder()) - throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner"); - if (jobKeyRange.start_token != null) - throw new IllegalArgumentException("only start_key supported"); - if (jobKeyRange.end_token != null) - throw new IllegalArgumentException("only start_key supported"); - jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key), - partitioner.getToken(jobKeyRange.end_key)); - } - else if (jobKeyRange.start_token != null) - { - jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token), - partitioner.getTokenFactory().fromString(jobKeyRange.end_token)); - } - else - { - logger.warn("ignoring jobKeyRange specified without start_key or start_token"); - } - } - - session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect(); - Metadata metadata = session.getCluster().getMetadata(); - - for (TokenRange range : masterRangeNodes.keySet()) - { - if (jobRange == null) - { - // for each tokenRange, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf))); - } - else - { - TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange); - if (range.intersects(jobTokenRange)) - { - for (TokenRange intersection: range.intersectWith(jobTokenRange)) - { - // for each tokenRange, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf))); - } - } - } - } - - // wait until we have all the results back - for (Future<List<InputSplit>> futureInputSplits : splitfutures) - { - try - { - splits.addAll(futureInputSplits.get()); - } - catch (Exception e) - { - throw new IOException("Could not get input splits", e); - } - } - } - finally - { - executor.shutdownNow(); - } - - assert splits.size() > 0; - Collections.shuffle(splits, new Random(System.nanoTime())); - return splits; - } - - private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range) - { - return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)), - metadata.newToken(partitioner.getTokenFactory().toString(range.right))); - } - - /** - * Gets a token tokenRange and splits it up according to the suggested - * size into input splits that Hadoop can use. - */ - class SplitCallable implements Callable<List<InputSplit>> - { - - private final TokenRange tokenRange; - private final Set<Host> hosts; - private final Configuration conf; - - public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf) - { - this.tokenRange = tr; - this.hosts = hosts; - this.conf = conf; - } - - public List<InputSplit> call() throws Exception - { - ArrayList<InputSplit> splits = new ArrayList<>(); - Map<TokenRange, Long> subSplits; - subSplits = getSubSplits(keyspace, cfName, tokenRange, conf); - // turn the sub-ranges into InputSplits - String[] endpoints = new String[hosts.size()]; - - // hadoop needs hostname, not ip - int endpointIndex = 0; - for (Host endpoint : hosts) - endpoints[endpointIndex++] = endpoint.getAddress().getHostName(); - - boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner; - - for (TokenRange subSplit : subSplits.keySet()) - { - List<TokenRange> ranges = subSplit.unwrap(); - for (TokenRange subrange : ranges) - { - ColumnFamilySplit split = - new ColumnFamilySplit( - partitionerIsOpp ? - subrange.getStart().toString().substring(2) : subrange.getStart().toString(), - partitionerIsOpp ? - subrange.getEnd().toString().substring(2) : subrange.getStart().toString(), - subSplits.get(subSplit), - endpoints); - - logger.debug("adding {}", split); - splits.add(split); - } - } - return splits; - } - } - - private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException - { - int splitSize = ConfigHelper.getInputSplitSize(conf); - try - { - return describeSplits(keyspace, cfName, range, splitSize); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace) - { - try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect()) - { - Map<TokenRange, Set<Host>> map = new HashMap<>(); - Metadata metadata = session.getCluster().getMetadata(); - for (TokenRange tokenRange : metadata.getTokenRanges()) - map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange)); - return map; - } - } - - private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize) - { - String query = String.format("SELECT mean_partition_size, partitions_count " + - "FROM %s.%s " + - "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", - SystemKeyspace.NAME, - SystemKeyspace.SIZE_ESTIMATES); - - ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()); - - Row row = resultSet.one(); - // If we have no data on this split, return the full split i.e., do not sub-split - // Assume smallest granularity of partition count available from CASSANDRA-7688 - if (row == null) - { - Map<TokenRange, Long> wrappedTokenRange = new HashMap<>(); - wrappedTokenRange.put(tokenRange, (long) 128); - return wrappedTokenRange; - } - - long meanPartitionSize = row.getLong("mean_partition_size"); - long partitionCount = row.getLong("partitions_count"); - - int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize); - List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount); - Map<TokenRange, Long> rangesWithLength = new HashMap<>(); - for (TokenRange range : splitRanges) - rangesWithLength.put(range, partitionCount/splitCount); - - return rangesWithLength; - } - - // Old Hadoop API - public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException - { - TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID()); - List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac); - org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()]; - for (int i = 0; i < newInputSplits.size(); i++) - oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i); - return oldInputSplits; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java deleted file mode 100644 index 5282279..0000000 --- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.cassandra.thrift.Mutation; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.*; - -@Deprecated -public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> - implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>> -{ - /** Fills the deprecated OutputFormat interface for streaming. */ - @Deprecated - public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException - { - return new BulkRecordWriter(job, progress); - } - - @Override - public BulkRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException - { - return new BulkRecordWriter(context); - } - - - @Override - public void checkOutputSpecs(JobContext context) - { - checkOutputSpecs(HadoopCompat.getConfiguration(context)); - } - - private void checkOutputSpecs(Configuration conf) - { - if (ConfigHelper.getOutputKeyspace(conf) == null) - { - throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()"); - } - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException - { - return new NullOutputCommitter(); - } - - /** Fills the deprecated OutputFormat interface for streaming. */ - @Deprecated - public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException - { - checkOutputSpecs(job); - } - - public static class NullOutputCommitter extends OutputCommitter - { - public void abortTask(TaskAttemptContext taskContext) { } - - public void cleanupJob(JobContext jobContext) { } - - public void commitTask(TaskAttemptContext taskContext) { } - - public boolean needsTaskCommit(TaskAttemptContext taskContext) - { - return false; - } - - public void setupJob(JobContext jobContext) { } - - public void setupTask(TaskAttemptContext taskContext) { } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java deleted file mode 100644 index 99abf9f..0000000 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; -import org.apache.cassandra.io.sstable.SSTableLoader; -import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter; -import org.apache.cassandra.streaming.StreamState; -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.CounterColumn; -import org.apache.cassandra.thrift.Mutation; -import org.apache.cassandra.utils.NativeSSTableLoaderClient; -import org.apache.cassandra.utils.OutputHandler; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.Progressable; - -@Deprecated -public final class BulkRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>> - implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>> -{ - public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; - public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; - public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; - public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; - - private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class); - - protected final Configuration conf; - protected final int maxFailures; - protected final int bufferSize; - protected Closeable writer; - protected SSTableLoader loader; - protected Progressable progress; - protected TaskAttemptContext context; - private File outputDir; - - - private enum CFType - { - NORMAL, - SUPER, - } - - private enum ColType - { - NORMAL, - COUNTER - } - - private CFType cfType; - private ColType colType; - - BulkRecordWriter(TaskAttemptContext context) - { - - this(HadoopCompat.getConfiguration(context)); - this.context = context; - } - - BulkRecordWriter(Configuration conf, Progressable progress) - { - this(conf); - this.progress = progress; - } - - BulkRecordWriter(Configuration conf) - { - Config.setOutboundBindAny(true); - this.conf = conf; - DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); - maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); - bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")); - } - - protected String getOutputLocation() throws IOException - { - String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); - if (dir == null) - throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); - return dir; - } - - private void setTypes(Mutation mutation) - { - if (cfType == null) - { - if (mutation.getColumn_or_supercolumn().isSetSuper_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column()) - cfType = CFType.SUPER; - else - cfType = CFType.NORMAL; - if (mutation.getColumn_or_supercolumn().isSetCounter_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column()) - colType = ColType.COUNTER; - else - colType = ColType.NORMAL; - } - } - - private void prepareWriter() throws IOException - { - if (outputDir == null) - { - String keyspace = ConfigHelper.getOutputKeyspace(conf); - //dir must be named by ks/cf for the loader - outputDir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); - outputDir.mkdirs(); - } - - if (writer == null) - { - AbstractType<?> subcomparator = null; - - if (cfType == CFType.SUPER) - subcomparator = BytesType.instance; - - writer = new SSTableSimpleUnsortedWriter( - outputDir, - ConfigHelper.getOutputPartitioner(conf), - ConfigHelper.getOutputKeyspace(conf), - ConfigHelper.getOutputColumnFamily(conf), - BytesType.instance, - subcomparator, - Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")), - ConfigHelper.getOutputCompressionParamaters(conf)); - - this.loader = new SSTableLoader(outputDir, new ExternalClient(conf), new NullOutputHandler()); - } - } - - @Override - public void close(TaskAttemptContext context) throws IOException - { - close(); - } - - /** Fills the deprecated RecordWriter interface for streaming. */ - @Deprecated - public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException - { - close(); - } - - private void close() throws IOException - { - if (writer != null) - { - writer.close(); - Future<StreamState> future = loader.stream(); - while (true) - { - try - { - future.get(1000, TimeUnit.MILLISECONDS); - break; - } - catch (ExecutionException | TimeoutException te) - { - if (null != progress) - progress.progress(); - if (null != context) - HadoopCompat.progress(context); - } - catch (InterruptedException e) - { - throw new IOException(e); - } - } - if (loader.getFailedHosts().size() > 0) - { - if (loader.getFailedHosts().size() > maxFailures) - throw new IOException("Too many hosts failed: " + loader.getFailedHosts()); - else - logger.warn("Some hosts failed: {}", loader.getFailedHosts()); - } - } - } - - @Override - public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException - { - setTypes(value.get(0)); - prepareWriter(); - SSTableSimpleUnsortedWriter ssWriter = (SSTableSimpleUnsortedWriter) writer; - ssWriter.newRow(keybuff); - for (Mutation mut : value) - { - if (cfType == CFType.SUPER) - { - ssWriter.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name); - if (colType == ColType.COUNTER) - for (CounterColumn column : mut.getColumn_or_supercolumn().getCounter_super_column().columns) - ssWriter.addCounterColumn(column.name, column.value); - else - { - for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns) - { - if(column.ttl == 0) - ssWriter.addColumn(column.name, column.value, column.timestamp); - else - ssWriter.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000)); - } - } - } - else - { - if (colType == ColType.COUNTER) - ssWriter.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value); - else - { - if(mut.getColumn_or_supercolumn().column.ttl == 0) - ssWriter.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); - else - ssWriter.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000)); - } - } - if (null != progress) - progress.progress(); - if (null != context) - HadoopCompat.progress(context); - } - } - - public static class ExternalClient extends NativeSSTableLoaderClient - { - public ExternalClient(Configuration conf) - { - super(resolveHostAddresses(conf), - CqlConfigHelper.getOutputNativePort(conf), - ConfigHelper.getOutputKeyspaceUserName(conf), - ConfigHelper.getOutputKeyspacePassword(conf), - CqlConfigHelper.getSSLOptions(conf).orNull()); - } - - private static Collection<InetAddress> resolveHostAddresses(Configuration conf) - { - Set<InetAddress> addresses = new HashSet<>(); - - for (String host : ConfigHelper.getOutputInitialAddress(conf).split(",")) - { - try - { - addresses.add(InetAddress.getByName(host)); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - - return addresses; - } - } - - public static class NullOutputHandler implements OutputHandler - { - public void output(String msg) {} - public void debug(String msg) {} - public void warn(String msg) {} - public void warn(String msg, Throwable th) {} - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java deleted file mode 100644 index 4662fa5..0000000 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.auth.PasswordAuthenticator; -import org.apache.cassandra.thrift.*; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; - -/** - * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. - * - * At minimum, you need to set the CF and predicate (description of columns to extract from each row) - * in your Hadoop job Configuration. The ConfigHelper class is provided to make this - * simple: - * ConfigHelper.setInputColumnFamily - * ConfigHelper.setInputSlicePredicate - * - * You can also configure the number of rows per InputSplit with - * ConfigHelper.setInputSplitSize - * This should be "as big as possible, but no bigger." Each InputSplit is read from Cassandra - * with multiple get_slice_range queries, and the per-call overhead of get_slice_range is high, - * so larger split sizes are better -- but if it is too large, you will run out of memory. - * - * The default split size is 64k rows. - */ -@Deprecated -public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> -{ - private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class); - - @SuppressWarnings("resource") - public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception - { - logger.debug("Creating authenticated client for CF input format"); - TTransport transport; - try - { - transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port); - } - catch (Exception e) - { - throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e); - } - TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); - Cassandra.Client client = new Cassandra.Client(binaryProtocol); - - // log in - client.set_keyspace(ConfigHelper.getInputKeyspace(conf)); - if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null)) - { - Map<String, String> creds = new HashMap<String, String>(); - creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf)); - creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf)); - AuthenticationRequest authRequest = new AuthenticationRequest(creds); - client.login(authRequest); - } - logger.debug("Authenticated client for CF input format created successfully"); - return client; - } - - public RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException - { - return new ColumnFamilyRecordReader(); - } - - public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException - { - TaskAttemptContext tac = HadoopCompat.newMapContext( - jobConf, - TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)), - null, - null, - null, - new ReporterWrapper(reporter), - null); - - ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT)); - recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); - return recordReader; - } - - @Override - protected void validateConfiguration(Configuration conf) - { - super.validateConfiguration(conf); - - if (ConfigHelper.getInputSlicePredicate(conf) == null) - { - throw new UnsupportedOperationException("you must set the predicate with setInputSlicePredicate"); - } - } - -}
