Remove deprecated legacy Hadoop code

patch by Philip Thompson; reviewed by Aleksey Yeschenko for
CASSANDRA-9353


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/446e2537
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/446e2537
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/446e2537

Branch: refs/heads/trunk
Commit: 446e2537895c15b404a74107069a12f3fc404b15
Parents: d62cd1b
Author: Philip Thompson <[email protected]>
Authored: Mon Jun 8 22:41:28 2015 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Mon Jun 8 22:41:28 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 NEWS.txt                                        |   14 +
 examples/hadoop_word_count/README.txt           |   50 -
 examples/hadoop_word_count/bin/word_count       |   61 -
 .../hadoop_word_count/bin/word_count_counters   |   59 -
 examples/hadoop_word_count/bin/word_count_setup |   61 -
 examples/hadoop_word_count/build.xml            |  113 --
 examples/hadoop_word_count/conf/logback.xml     |   42 -
 examples/hadoop_word_count/ivy.xml              |   24 -
 examples/hadoop_word_count/src/WordCount.java   |  222 ---
 .../src/WordCountCounters.java                  |  104 --
 .../hadoop_word_count/src/WordCountSetup.java   |  239 ---
 .../hadoop/AbstractColumnFamilyInputFormat.java |  296 ----
 .../cassandra/hadoop/BulkOutputFormat.java      |   91 --
 .../cassandra/hadoop/BulkRecordWriter.java      |  296 ----
 .../hadoop/ColumnFamilyInputFormat.java         |  125 --
 .../hadoop/ColumnFamilyOutputFormat.java        |  183 ---
 .../hadoop/ColumnFamilyRecordReader.java        |  615 --------
 .../hadoop/ColumnFamilyRecordWriter.java        |  341 -----
 .../hadoop/cql3/CqlBulkRecordWriter.java        |   12 +-
 .../cassandra/hadoop/cql3/CqlInputFormat.java   |  277 +++-
 .../cassandra/hadoop/cql3/CqlOutputFormat.java  |    3 +
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  |    5 +-
 .../cassandra/hadoop/pig/CassandraStorage.java  | 1397 ------------------
 .../org/apache/cassandra/pig/PigTestBase.java   |   27 +-
 .../pig/ThriftColumnFamilyDataTypeTest.java     |   70 +-
 .../cassandra/pig/ThriftColumnFamilyTest.java   |  320 ++--
 27 files changed, 451 insertions(+), 4597 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 560bd2b..061bd5f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0:
+ * Remove deprecated legacy Hadoop code (CASSANDRA-9353)
  * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801)
  * Change gossip stabilization to use endpoit size (CASSANDRA-9401)
  * Change default garbage collector to G1 (CASSANDRA-7486)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 9beb911..44637eb 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,20 @@ restore snapshots created with the previous major version 
using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+3.0
+===
+
+Upgrading
+---------
+   - Pig's CassandraStorage has been removed. Use CqlNativeStorage instead.
+   - Hadoop BulkOutputFormat and BulkRecordWriter have been removed; use
+     CqlBulkOutputFormat and CqlBulkRecordWriter instead.
+   - Hadoop ColumnFamilyInputFormat and ColumnFamilyOutputFormat have been 
removed;
+     use CqlInputFormat and CqlOutputFormat instead.
+   - Hadoop ColumnFamilyRecordReader and ColumnFamilyRecordWriter have been 
removed;
+     use CqlRecordReader and CqlRecordWriter instead.
+
+
 2.2
 ===
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/README.txt
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/README.txt 
b/examples/hadoop_word_count/README.txt
deleted file mode 100644
index e336b89..0000000
--- a/examples/hadoop_word_count/README.txt
+++ /dev/null
@@ -1,50 +0,0 @@
-Introduction
-============
-
-WordCount hadoop example: Inserts a bunch of words across multiple rows,
-and counts them, with RandomPartitioner. The word_count_counters example sums
-the value of counter columns for a key.
-
-The scripts in bin/ assume you are running with cwd of examples/word_count.
-
-
-Running
-=======
-
-First build and start a Cassandra server with the default configuration*. 
Ensure that the Thrift
-interface is enabled, either by setting start_rpc:true in cassandra.yaml or by 
running
-`nodetool enablethrift` after startup.
-Once Cassandra has started and the Thrift interface is available, run
-
-contrib/word_count$ ant
-contrib/word_count$ bin/word_count_setup
-contrib/word_count$ bin/word_count
-contrib/word_count$ bin/word_count_counters
-
-In order to view the results in Cassandra, one can use bin/cqlsh and
-perform the following operations:
-$ bin/cqlsh localhost
-> use wordcount;
-> select * from output_words;
-
-The output of the word count can now be configured. In the bin/word_count
-file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem'
-and 'cassandra'. The filesystem option outputs to the /tmp/word_count*
-directories. The cassandra option outputs to the 'output_words' column family
-in the 'wordcount' keyspace.  'cassandra' is the default.
-
-Read the code in src/ for more details.
-
-The word_count_counters example sums the counter columns for a row. The output
-is written to a text file in /tmp/word_count_counters.
-
-*It is recommended to turn off vnodes when running Cassandra with hadoop.
-This is done by setting "num_tokens: 1" in cassandra.yaml. If you want to
-point wordcount at a real cluster, modify the seed and listenaddress
-settings accordingly.
-
-
-Troubleshooting
-===============
-
-word_count uses conf/logback.xml to log to wc.out.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/bin/word_count
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/bin/word_count 
b/examples/hadoop_word_count/bin/word_count
deleted file mode 100755
index 34534d7..0000000
--- a/examples/hadoop_word_count/bin/word_count
+++ /dev/null
@@ -1,61 +0,0 @@
-#!/bin/sh
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cwd=`dirname $0`
-
-# Cassandra class files.
-if [ ! -d $cwd/../../../build/classes/main ]; then
-    echo "Unable to locate cassandra class files" >&2
-    exit 1
-fi
-
-# word_count Jar.
-if [ ! -e $cwd/../build/word_count.jar ]; then
-    echo "Unable to locate word_count jar" >&2
-    exit 1
-fi
-
-CLASSPATH=$CLASSPATH:$cwd/../conf
-CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
-CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
-CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift
-for jar in $cwd/../build/lib/jars/*.jar; do
-    CLASSPATH=$CLASSPATH:$jar
-done
-for jar in $cwd/../../../lib/*.jar; do
-    CLASSPATH=$CLASSPATH:$jar
-done
-for jar in $cwd/../../../build/lib/jars/*.jar; do
-    CLASSPATH=$CLASSPATH:$jar
-done
-
-if [ -x $JAVA_HOME/bin/java ]; then
-    JAVA=$JAVA_HOME/bin/java
-else
-    JAVA=`which java`
-fi
-
-if [ "x$JAVA" = "x" ]; then
-    echo "Java executable not found (hint: set JAVA_HOME)" >&2
-    exit 1
-fi
-
-OUTPUT_REDUCER=cassandra
-
-#echo $CLASSPATH
-"$JAVA" -Xmx1G -ea -cp "$CLASSPATH" WordCount output_reducer=$OUTPUT_REDUCER

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/bin/word_count_counters 
b/examples/hadoop_word_count/bin/word_count_counters
deleted file mode 100755
index 122565d..0000000
--- a/examples/hadoop_word_count/bin/word_count_counters
+++ /dev/null
@@ -1,59 +0,0 @@
-#!/bin/sh
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cwd=`dirname $0`
-
-# Cassandra class files.
-if [ ! -d $cwd/../../../build/classes/main ]; then
-    echo "Unable to locate cassandra class files" >&2
-    exit 1
-fi
-
-# word_count Jar.
-if [ ! -e $cwd/../build/word_count.jar ]; then
-    echo "Unable to locate word_count jar" >&2
-    exit 1
-fi
-
-CLASSPATH=$CLASSPATH:$cwd/../conf
-CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
-CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
-CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift
-for jar in $cwd/../build/lib/jars/*.jar; do
-    CLASSPATH=$CLASSPATH:$jar
-done
-for jar in $cwd/../../../lib/*.jar; do
-    CLASSPATH=$CLASSPATH:$jar
-done
-for jar in $cwd/../../../build/lib/jars/*.jar; do
-    CLASSPATH=$CLASSPATH:$jar
-done
-
-if [ -x $JAVA_HOME/bin/java ]; then
-    JAVA=$JAVA_HOME/bin/java
-else
-    JAVA=`which java`
-fi
-
-if [ "x$JAVA" = "x" ]; then
-    echo "Java executable not found (hint: set JAVA_HOME)" >&2
-    exit 1
-fi
-
-#echo $CLASSPATH
-"$JAVA" -Xmx1G -ea -cp "$CLASSPATH" WordCountCounters

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/bin/word_count_setup
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/bin/word_count_setup 
b/examples/hadoop_word_count/bin/word_count_setup
deleted file mode 100755
index 6e5650f..0000000
--- a/examples/hadoop_word_count/bin/word_count_setup
+++ /dev/null
@@ -1,61 +0,0 @@
-#!/bin/sh
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cwd=`dirname $0`
-
-# Cassandra class files.
-if [ ! -d $cwd/../../../build/classes/main ]; then
-    echo "Unable to locate cassandra class files" >&2
-    exit 1
-fi
-
-# word_count Jar.
-if [ ! -e $cwd/../build/word_count.jar ]; then
-    echo "Unable to locate word_count jar" >&2
-    exit 1
-fi
-
-CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
-CLASSPATH=$CLASSPATH:.:$cwd/../../../build/classes/main
-CLASSPATH=$CLASSPATH:.:$cwd/../../../build/classes/thrift
-for jar in $cwd/../build/lib/jars/*.jar; do
-    CLASSPATH=$CLASSPATH:$jar
-done
-for jar in $cwd/../../../lib/*.jar; do
-    CLASSPATH=$CLASSPATH:$jar
-done
-for jar in $cwd/../../../build/lib/jars/*.jar; do
-    CLASSPATH=$CLASSPATH:$jar
-done
-
-if [ -x $JAVA_HOME/bin/java ]; then
-    JAVA=$JAVA_HOME/bin/java
-else
-    JAVA=`which java`
-fi
-
-if [ "x$JAVA" = "x" ]; then
-    echo "Java executable not found (hint: set JAVA_HOME)" >&2
-    exit 1
-fi
-
-HOST=localhost
-PORT=9160
-FRAMED=true
-
-"$JAVA" -Xmx1G -ea -Dcassandra.host=$HOST -Dcassandra.port=$PORT 
-Dcassandra.framed=$FRAMED -cp "$CLASSPATH" WordCountSetup

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/build.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/build.xml 
b/examples/hadoop_word_count/build.xml
deleted file mode 100644
index 939e1b3..0000000
--- a/examples/hadoop_word_count/build.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-<project default="jar" name="word_count" xmlns:ivy="antlib:org.apache.ivy.ant">
-    <property name="cassandra.dir" value="../.." />
-    <property name="cassandra.dir.lib" value="${cassandra.dir}/lib" />
-    <property name="cassandra.classes" value="${cassandra.dir}/build/classes" 
/>
-    <property name="build.src" value="${basedir}/src" />
-    <property name="build.dir" value="${basedir}/build" />
-    <property name="ivy.lib.dir" value="${build.dir}/lib" />
-    <property name="build.classes" value="${build.dir}/classes" />
-    <property name="final.name" value="word_count" />
-    <property name="ivy.version" value="2.1.0" />
-    <property name="ivy.url"
-              value="http://repo2.maven.org/maven2/org/apache/ivy/ivy"; />
-
-    <condition property="ivy.jar.exists">
-        <available file="${build.dir}/ivy-${ivy.version}.jar" />
-    </condition>
-
-    <path id="autoivy.classpath">
-        <fileset dir="${ivy.lib.dir}">
-            <include name="**/*.jar" />
-        </fileset>
-        <pathelement location="${build.dir}/ivy-${ivy.version}.jar"/>
-    </path>
-
-    <path id="wordcount.build.classpath">
-        <fileset dir="${ivy.lib.dir}">
-            <include name="**/*.jar" />
-        </fileset>
-        <!-- cassandra dependencies -->
-        <fileset dir="${cassandra.dir.lib}">
-            <include name="**/*.jar" />
-        </fileset>
-        <fileset dir="${cassandra.dir}/build/lib/jars">
-            <include name="**/*.jar" />
-        </fileset>
-        <pathelement location="${cassandra.classes}/main" />
-        <pathelement location="${cassandra.classes}/thrift" />
-    </path>
-
-    <target name="init">
-        <mkdir dir="${build.classes}" />
-    </target>
-
-    <target depends="init,ivy-retrieve-build" name="build">
-        <javac destdir="${build.classes}">
-            <src path="${build.src}" />
-            <classpath refid="wordcount.build.classpath" />
-        </javac>
-    </target>
-
-    <target name="jar" depends="build">
-        <mkdir dir="${build.classes}/META-INF" />
-        <jar jarfile="${build.dir}/${final.name}.jar">
-           <fileset dir="${build.classes}" />
-           <fileset dir="${cassandra.classes}/main" />
-           <fileset dir="${cassandra.classes}/thrift" />
-           <fileset dir="${cassandra.dir}">
-               <include name="lib/**/*.jar" />
-           </fileset>
-           <zipfileset dir="${cassandra.dir}/build/lib/jars/" prefix="lib">
-               <include name="**/*.jar" />
-           </zipfileset>
-           <fileset file="${basedir}/cassandra.yaml" />
-        </jar>
-    </target>
-
-    <target name="clean">
-        <delete dir="${build.dir}" />
-    </target>
-
-    <!--
-        Ivy Specific targets
-            to fetch Ivy and this project's dependencies
-    -->
-       <target name="ivy-download" unless="ivy.jar.exists">
-      <echo>Downloading Ivy...</echo>
-      <mkdir dir="${build.dir}" />
-      <get src="${ivy.url}/${ivy.version}/ivy-${ivy.version}.jar"
-           dest="${build.dir}/ivy-${ivy.version}.jar" usetimestamp="true" />
-    </target>
-
-    <target name="ivy-init" depends="ivy-download" unless="ivy.initialized">
-      <mkdir dir="${ivy.lib.dir}"/>
-      <taskdef resource="org/apache/ivy/ant/antlib.xml"
-               uri="antlib:org.apache.ivy.ant"
-               classpathref="autoivy.classpath"/>
-      <property name="ivy.initialized" value="true"/>
-    </target>
-
-    <target name="ivy-retrieve-build" depends="ivy-init">
-      <ivy:retrieve type="jar,source" sync="true"
-             pattern="${ivy.lib.dir}/[type]s/[artifact]-[revision].[ext]" />
-    </target>
-</project>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/conf/logback.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/conf/logback.xml 
b/examples/hadoop_word_count/conf/logback.xml
deleted file mode 100644
index 443bd1c..0000000
--- a/examples/hadoop_word_count/conf/logback.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-<configuration scan="true">
-
-  <jmxConfigurator />
-
-  <appender name="FILE" class="ch.qos.logback.core.FileAppender">
-    <file>wc.out</file>
-    <encoder>
-      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
-    </encoder>
-  </appender>
-
-  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-    <encoder>
-      <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
-    </encoder>
-  </appender>
-
-  <root level="INFO">
-    <appender-ref ref="FILE" />
-    <appender-ref ref="STDOUT" />
-  </root>
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/ivy.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/ivy.xml 
b/examples/hadoop_word_count/ivy.xml
deleted file mode 100644
index 2016eb8..0000000
--- a/examples/hadoop_word_count/ivy.xml
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-<ivy-module version="2.0">
-    <info organisation="apache-cassandra" module="word-count"/>
-    <dependencies>
-        <dependency org="org.apache.hadoop" name="hadoop-core" rev="1.0.3"/>
-    </dependencies>
-</ivy-module>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCount.java 
b/examples/hadoop_word_count/src/WordCount.java
deleted file mode 100644
index d092f1f..0000000
--- a/examples/hadoop_word_count/src/WordCount.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * This counts the occurrences of words in ColumnFamily Standard1, that has a 
single column (that we care about)
- * "text" containing a sequence of words.
- *
- * For each word, we output the total number of occurrences across all texts.
- *
- * When outputting to Cassandra, we write the word counts as a {word, count} 
column/value pair,
- * with a row key equal to the name of the source column we read the words 
from.
- */
-public class WordCount extends Configured implements Tool
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(WordCount.class);
-
-    static final String KEYSPACE = "wordcount";
-    static final String COLUMN_FAMILY = "input_words";
-
-    static final String OUTPUT_REDUCER_VAR = "output_reducer";
-    static final String OUTPUT_COLUMN_FAMILY = "output_words";
-    private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
-
-    private static final String CONF_COLUMN_NAME = "columnname";
-
-    public static void main(String[] args) throws Exception
-    {
-        // Let ToolRunner handle generic command-line options
-        ToolRunner.run(new Configuration(), new WordCount(), args);
-        System.exit(0);
-    }
-
-    public static class TokenizerMapper extends Mapper<ByteBuffer, 
SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, IntWritable>
-    {
-        private final static IntWritable one = new IntWritable(1);
-        private Text word = new Text();
-        private ByteBuffer sourceColumn;
-
-        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context 
context)
-        throws IOException, InterruptedException
-        {
-        }
-
-        public void map(ByteBuffer key, SortedMap<ByteBuffer, 
ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, 
InterruptedException
-        {
-            for (ColumnFamilyRecordReader.Column column : columns.values())
-            {
-                String name  = ByteBufferUtil.string(column.name);
-                String value = null;
-                
-                if (name.contains("int"))
-                    value = String.valueOf(ByteBufferUtil.toInt(column.value));
-                else
-                    value = ByteBufferUtil.string(column.value);
-                               
-                logger.debug("read {}:{}={} from {}",
-                             new Object[] {ByteBufferUtil.string(key), name, 
value, context.getInputSplit()});
-
-                StringTokenizer itr = new StringTokenizer(value);
-                while (itr.hasMoreTokens())
-                {
-                    word.set(itr.nextToken());
-                    context.write(word, one);
-                }
-            }
-        }
-    }
-
-    public static class ReducerToFilesystem extends Reducer<Text, IntWritable, 
Text, IntWritable>
-    {
-        public void reduce(Text key, Iterable<IntWritable> values, Context 
context) throws IOException, InterruptedException
-        {
-            int sum = 0;
-            for (IntWritable val : values)
-                sum += val.get();
-            context.write(key, new IntWritable(sum));
-        }
-    }
-
-    public static class ReducerToCassandra extends Reducer<Text, IntWritable, 
ByteBuffer, List<Mutation>>
-    {
-        private ByteBuffer outputKey;
-
-        protected void setup(org.apache.hadoop.mapreduce.Reducer.Context 
context)
-        throws IOException, InterruptedException
-        {
-            outputKey = 
ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
-        }
-
-        public void reduce(Text word, Iterable<IntWritable> values, Context 
context) throws IOException, InterruptedException
-        {
-            int sum = 0;
-            for (IntWritable val : values)
-                sum += val.get();
-            context.write(outputKey, 
Collections.singletonList(getMutation(word, sum)));
-        }
-
-        private static Mutation getMutation(Text word, int sum)
-        {
-            org.apache.cassandra.thrift.Column c = new 
org.apache.cassandra.thrift.Column();
-            c.setName(Arrays.copyOf(word.getBytes(), word.getLength()));
-            c.setValue(ByteBufferUtil.bytes(sum));
-            c.setTimestamp(System.currentTimeMillis());
-
-            Mutation m = new Mutation();
-            m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
-            m.column_or_supercolumn.setColumn(c);
-            return m;
-        }
-    }
-
-    public int run(String[] args) throws Exception
-    {
-        String outputReducerType = "filesystem";
-        if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
-        {
-            String[] s = args[0].split("=");
-            if (s != null && s.length == 2)
-                outputReducerType = s[1];
-        }
-        logger.info("output reducer type: " + outputReducerType);
-
-        // use a smaller page size that doesn't divide the row count evenly to 
exercise the paging logic better
-        ConfigHelper.setRangeBatchSize(getConf(), 99);
-
-        for (int i = 0; i < WordCountSetup.TEST_COUNT; i++)
-        {
-            String columnName = "text" + i;
-
-            Job job = new Job(getConf(), "wordcount");
-            job.setJarByClass(WordCount.class);
-            job.setMapperClass(TokenizerMapper.class);
-
-            if (outputReducerType.equalsIgnoreCase("filesystem"))
-            {
-                job.setCombinerClass(ReducerToFilesystem.class);
-                job.setReducerClass(ReducerToFilesystem.class);
-                job.setOutputKeyClass(Text.class);
-                job.setOutputValueClass(IntWritable.class);
-                FileOutputFormat.setOutputPath(job, new 
Path(OUTPUT_PATH_PREFIX + i));
-            }
-            else
-            {
-                job.setReducerClass(ReducerToCassandra.class);
-
-                job.setMapOutputKeyClass(Text.class);
-                job.setMapOutputValueClass(IntWritable.class);
-                job.setOutputKeyClass(ByteBuffer.class);
-                job.setOutputValueClass(List.class);
-
-                job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
-
-                ConfigHelper.setOutputColumnFamily(job.getConfiguration(), 
KEYSPACE, OUTPUT_COLUMN_FAMILY);
-                job.getConfiguration().set(CONF_COLUMN_NAME, "sum");
-            }
-
-            job.setInputFormatClass(ColumnFamilyInputFormat.class);
-
-            ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
-            ConfigHelper.setInputInitialAddress(job.getConfiguration(), 
"localhost");
-            ConfigHelper.setInputPartitioner(job.getConfiguration(), 
"Murmur3Partitioner");
-            ConfigHelper.setInputColumnFamily(job.getConfiguration(), 
KEYSPACE, COLUMN_FAMILY);
-            SlicePredicate predicate = new 
SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName)));
-            ConfigHelper.setInputSlicePredicate(job.getConfiguration(), 
predicate);
-
-            if (i == 4)
-            {
-                IndexExpression expr = new 
IndexExpression(ByteBufferUtil.bytes("int4"), IndexOperator.EQ, 
ByteBufferUtil.bytes(0));
-                ConfigHelper.setInputRange(job.getConfiguration(), 
Arrays.asList(expr));
-            }
-
-            if (i == 5)
-            {
-                // this will cause the predicate to be ignored in favor of 
scanning everything as a wide row
-                ConfigHelper.setInputColumnFamily(job.getConfiguration(), 
KEYSPACE, COLUMN_FAMILY, true);
-            }
-
-            ConfigHelper.setOutputInitialAddress(job.getConfiguration(), 
"localhost");
-            ConfigHelper.setOutputPartitioner(job.getConfiguration(), 
"Murmur3Partitioner");
-
-            job.waitForCompletion(true);
-        }
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCountCounters.java 
b/examples/hadoop_word_count/src/WordCountCounters.java
deleted file mode 100644
index 98c8579..0000000
--- a/examples/hadoop_word_count/src/WordCountCounters.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.SortedMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
-import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * This sums the word count stored in the input_words_count ColumnFamily for 
the key "key-if-verse1".
- *
- * Output is written to a text file.
- */
-public class WordCountCounters extends Configured implements Tool
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(WordCountCounters.class);
-
-    static final String COUNTER_COLUMN_FAMILY = "input_words_count";
-    private static final String OUTPUT_PATH_PREFIX = 
"/tmp/word_count_counters";
-
-
-    public static void main(String[] args) throws Exception
-    {
-        // Let ToolRunner handle generic command-line options
-        ToolRunner.run(new Configuration(), new WordCountCounters(), args);
-        System.exit(0);
-    }
-
-    public static class SumMapper extends Mapper<ByteBuffer, 
SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, LongWritable>
-    {
-        public void map(ByteBuffer key, SortedMap<ByteBuffer, 
ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, 
InterruptedException
-        {
-            long sum = 0;
-            for (ColumnFamilyRecordReader.Column column : columns.values())
-            {
-                logger.debug("read " + key + ":" + 
ByteBufferUtil.string(column.name) + " from " + context.getInputSplit());
-                sum += ByteBufferUtil.toLong(column.value);
-            }
-            context.write(new Text(ByteBufferUtil.string(key)), new 
LongWritable(sum));
-        }
-    }
-
-    public int run(String[] args) throws Exception
-    {
-        Job job = new Job(getConf(), "wordcountcounters");
-        job.setJarByClass(WordCountCounters.class);
-        job.setMapperClass(SumMapper.class);
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(LongWritable.class);
-        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
-
-
-        job.setInputFormatClass(ColumnFamilyInputFormat.class);
-
-        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
-        ConfigHelper.setInputInitialAddress(job.getConfiguration(), 
"localhost");
-        ConfigHelper.setInputPartitioner(job.getConfiguration(), 
"org.apache.cassandra.dht.Murmur3Partitioner");
-        ConfigHelper.setInputColumnFamily(job.getConfiguration(), 
WordCount.KEYSPACE, WordCountCounters.COUNTER_COLUMN_FAMILY);
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(
-                                                                        new 
SliceRange().
-                                                                        
setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER).
-                                                                        
setFinish(ByteBufferUtil.EMPTY_BYTE_BUFFER).
-                                                                        
setCount(100));
-        ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
-
-        job.waitForCompletion(true);
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/examples/hadoop_word_count/src/WordCountSetup.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCountSetup.java 
b/examples/hadoop_word_count/src/WordCountSetup.java
deleted file mode 100644
index 0ef5341..0000000
--- a/examples/hadoop_word_count/src/WordCountSetup.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-
-public class WordCountSetup
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(WordCountSetup.class);
-
-    public static final int TEST_COUNT = 6;
-
-    public static void main(String[] args) throws Exception
-    {
-        Cassandra.Iface client = createConnection();
-
-        setupKeyspace(client);
-
-        client.set_keyspace(WordCount.KEYSPACE);
-
-        Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap;
-        Column c;
-
-        // text0: no rows
-
-        // text1: 1 row, 1 word
-        c = new Column()
-            .setName(ByteBufferUtil.bytes("text1"))
-            .setValue(ByteBufferUtil.bytes("word1"))
-            .setTimestamp(System.currentTimeMillis());
-        mutationMap = getMutationMap(ByteBufferUtil.bytes("key0"), 
WordCount.COLUMN_FAMILY, c);
-        client.batch_mutate(mutationMap, ConsistencyLevel.ONE);
-        logger.info("added text1");
-
-        // text1: 1 row, 2 word
-        c = new Column()
-            .setName(ByteBufferUtil.bytes("text2"))
-            .setValue(ByteBufferUtil.bytes("word1 word2"))
-            .setTimestamp(System.currentTimeMillis());
-        mutationMap = getMutationMap(ByteBufferUtil.bytes("key0"), 
WordCount.COLUMN_FAMILY, c);
-        client.batch_mutate(mutationMap, ConsistencyLevel.ONE);
-        logger.info("added text2");
-
-        // text3: 1000 rows, 1 word
-        mutationMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-        for (int i = 0; i < 1000; i++)
-        {
-            c = new Column()
-                .setName(ByteBufferUtil.bytes("text3"))
-                .setValue(ByteBufferUtil.bytes("word1"))
-                .setTimestamp(System.currentTimeMillis());
-            addToMutationMap(mutationMap, ByteBufferUtil.bytes("key" + i), 
WordCount.COLUMN_FAMILY, c);
-        }
-        client.batch_mutate(mutationMap, ConsistencyLevel.ONE);
-        logger.info("added text3");
-
-        // text4: 1000 rows, 1 word, one column to filter on
-        mutationMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-        for (int i = 0; i < 1000; i++)
-        {
-            Column c1 = new Column()
-                       .setName(ByteBufferUtil.bytes("text4"))
-                       .setValue(ByteBufferUtil.bytes("word1"))
-                       .setTimestamp(System.currentTimeMillis());
-            Column c2 = new Column()
-                       .setName(ByteBufferUtil.bytes("int4"))
-                       .setValue(ByteBufferUtil.bytes(i % 4))
-                       .setTimestamp(System.currentTimeMillis());
-            ByteBuffer key = ByteBufferUtil.bytes("key" + i);
-            addToMutationMap(mutationMap, key, WordCount.COLUMN_FAMILY, c1);
-            addToMutationMap(mutationMap, key, WordCount.COLUMN_FAMILY, c2);
-        }
-        client.batch_mutate(mutationMap, ConsistencyLevel.ONE);
-        logger.info("added text4");
-
-        // sentence data for the counters
-        final ByteBuffer key = ByteBufferUtil.bytes("key-if-verse1");
-        final ColumnParent colParent = new 
ColumnParent(WordCountCounters.COUNTER_COLUMN_FAMILY);
-        for (String sentence : sentenceData())
-        {
-            client.add(key,
-                       colParent,
-                       new CounterColumn(ByteBufferUtil.bytes(sentence),
-                                         (long) sentence.split("\\s").length),
-                       ConsistencyLevel.ONE);
-        }
-        logger.info("added key-if-verse1");
-
-        System.exit(0);
-    }
-
-    private static Map<ByteBuffer, Map<String, List<Mutation>>> 
getMutationMap(ByteBuffer key, String cf, Column c)
-    {
-        Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new 
HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-        addToMutationMap(mutationMap, key, cf, c);
-        return mutationMap;
-    }
-
-    private static void addToMutationMap(Map<ByteBuffer, Map<String, 
List<Mutation>>> mutationMap, ByteBuffer key, String cf, Column c)
-    {
-        Map<String, List<Mutation>> cfMutation = mutationMap.get(key);
-        if (cfMutation == null)
-        {
-            cfMutation = new HashMap<String, List<Mutation>>();
-            mutationMap.put(key, cfMutation);
-        }
-
-        List<Mutation> mutationList = cfMutation.get(cf);
-        if (mutationList == null)
-        {
-            mutationList = new ArrayList<Mutation>();
-            cfMutation.put(cf, mutationList);
-        }
-
-        ColumnOrSuperColumn cc = new ColumnOrSuperColumn();
-        Mutation m = new Mutation();
-
-        cc.setColumn(c);
-        m.setColumn_or_supercolumn(cc);
-        mutationList.add(m);
-    }
-
-    private static void setupKeyspace(Cassandra.Iface client) throws 
TException, InvalidRequestException, SchemaDisagreementException
-    {
-        List<CfDef> cfDefList = new ArrayList<CfDef>();
-        CfDef input = new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
-        input.setComparator_type("AsciiType");
-        input.setColumn_metadata(Arrays.asList(new 
ColumnDef(ByteBufferUtil.bytes("text1"), "AsciiType"),
-                                               new 
ColumnDef(ByteBufferUtil.bytes("text2"), "AsciiType"),
-                                               new 
ColumnDef(ByteBufferUtil.bytes("text3"), "AsciiType"),
-                                               new 
ColumnDef(ByteBufferUtil.bytes("text4"), "AsciiType"),
-                                               new 
ColumnDef(ByteBufferUtil.bytes("int4"), 
"Int32Type").setIndex_name("int4idx").setIndex_type(IndexType.KEYS)));
-        cfDefList.add(input);
-
-        CfDef output = new CfDef(WordCount.KEYSPACE, 
WordCount.OUTPUT_COLUMN_FAMILY);
-        output.setComparator_type("AsciiType");
-        output.setDefault_validation_class("Int32Type");
-        cfDefList.add(output);
-
-        CfDef counterInput = new CfDef(WordCount.KEYSPACE, 
WordCountCounters.COUNTER_COLUMN_FAMILY);
-        counterInput.setComparator_type("UTF8Type");
-        counterInput.setDefault_validation_class("CounterColumnType");
-        cfDefList.add(counterInput);
-
-        KsDef ksDef = new KsDef(WordCount.KEYSPACE, 
"org.apache.cassandra.locator.SimpleStrategy", cfDefList);
-        ksDef.putToStrategy_options("replication_factor", "1");
-        client.system_add_keyspace(ksDef);
-
-        int magnitude = getNumberOfHosts(client);
-        Uninterruptibles.sleepUninterruptibly(magnitude, TimeUnit.SECONDS);
-    }
-
-    private static int getNumberOfHosts(Cassandra.Iface client)
-            throws InvalidRequestException, UnavailableException, 
TimedOutException, TException
-    {
-        client.set_keyspace("system");
-        SlicePredicate predicate = new SlicePredicate();
-        SliceRange sliceRange = new SliceRange();
-        sliceRange.setStart(new byte[0]);
-        sliceRange.setFinish(new byte[0]);
-        predicate.setSlice_range(sliceRange);
-
-        KeyRange keyrRange = new KeyRange();
-        keyrRange.setStart_key(new byte[0]);
-        keyrRange.setEnd_key(new byte[0]);
-        //keyrRange.setCount(100);
-
-        ColumnParent parent = new ColumnParent("peers");
-
-        List<KeySlice> ls = client.get_range_slices(parent, predicate, 
keyrRange, ConsistencyLevel.ONE);
-
-        return ls.size();
-    }
-
-    private static Cassandra.Iface createConnection() throws 
TTransportException
-    {
-        if (System.getProperty("cassandra.host") == null || 
System.getProperty("cassandra.port") == null)
-        {
-            logger.warn("cassandra.host or cassandra.port is not defined, 
using default");
-        }
-        return createConnection(System.getProperty("cassandra.host", 
"localhost"),
-                                
Integer.valueOf(System.getProperty("cassandra.port", "9160")));
-    }
-
-    private static Cassandra.Client createConnection(String host, Integer 
port) throws TTransportException
-    {
-        TSocket socket = new TSocket(host, port);
-        TTransport trans = new TFramedTransport(socket);
-        trans.open();
-        TProtocol protocol = new TBinaryProtocol(trans);
-
-        return new Cassandra.Client(protocol);
-    }
-
-    private static String[] sentenceData()
-    {   // Public domain context, source 
http://en.wikisource.org/wiki/If%E2%80%94
-        return new String[]{
-                "If you can keep your head when all about you",
-                "Are losing theirs and blaming it on you",
-                "If you can trust yourself when all men doubt you,",
-                "But make allowance for their doubting too:",
-                "If you can wait and not be tired by waiting,",
-                "Or being lied about, don’t deal in lies,",
-                "Or being hated, don’t give way to hating,",
-                "And yet don’t look too good, nor talk too wise;"
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java 
b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
deleted file mode 100644
index 4dd53ff..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datastax.driver.core.Host;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TokenRange;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.ByteOrderedPartitioner;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.hadoop.cql3.*;
-import org.apache.cassandra.thrift.KeyRange;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.*;
-
-public abstract class AbstractColumnFamilyInputFormat<K, Y> extends 
InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(AbstractColumnFamilyInputFormat.class);
-
-    public static final String MAPRED_TASK_ID = "mapred.task.id";
-    // The simple fact that we need this is because the old Hadoop API wants 
us to "write"
-    // to the key and value whereas the new asks for it.
-    // I choose 8kb as the default max key size (instantiated only once), but 
you can
-    // override it in your jobConf with this setting.
-    public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = 
"cassandra.hadoop.max_key_size";
-    public static final int    CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
-
-    private String keyspace;
-    private String cfName;
-    private IPartitioner partitioner;
-    private Session session;
-
-    protected void validateConfiguration(Configuration conf)
-    {
-        if (ConfigHelper.getInputKeyspace(conf) == null || 
ConfigHelper.getInputColumnFamily(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace 
and table with setInputColumnFamily()");
-        }
-        if (ConfigHelper.getInputInitialAddress(conf) == null)
-            throw new UnsupportedOperationException("You must set the initial 
output address to a Cassandra node with setInputInitialAddress");
-        if (ConfigHelper.getInputPartitioner(conf) == null)
-            throw new UnsupportedOperationException("You must set the 
Cassandra partitioner class with setInputPartitioner");
-    }
-
-    public List<InputSplit> getSplits(JobContext context) throws IOException
-    {
-        Configuration conf = HadoopCompat.getConfiguration(context);
-
-        validateConfiguration(conf);
-
-        keyspace = ConfigHelper.getInputKeyspace(conf);
-        cfName = ConfigHelper.getInputColumnFamily(conf);
-        partitioner = ConfigHelper.getInputPartitioner(conf);
-        logger.debug("partitioner is {}", partitioner);
-
-        // canonical ranges and nodes holding replicas
-        Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, 
keyspace);
-
-        // canonical ranges, split into pieces, fetching the splits in parallel
-        ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-        List<InputSplit> splits = new ArrayList<>();
-
-        try
-        {
-            List<Future<List<InputSplit>>> splitfutures = new ArrayList<>();
-            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
-            Range<Token> jobRange = null;
-            if (jobKeyRange != null)
-            {
-                if (jobKeyRange.start_key != null)
-                {
-                    if (!partitioner.preservesOrder())
-                        throw new UnsupportedOperationException("KeyRange 
based on keys can only be used with a order preserving partitioner");
-                    if (jobKeyRange.start_token != null)
-                        throw new IllegalArgumentException("only start_key 
supported");
-                    if (jobKeyRange.end_token != null)
-                        throw new IllegalArgumentException("only start_key 
supported");
-                    jobRange = new 
Range<>(partitioner.getToken(jobKeyRange.start_key),
-                                           
partitioner.getToken(jobKeyRange.end_key));
-                }
-                else if (jobKeyRange.start_token != null)
-                {
-                    jobRange = new 
Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
-                                           
partitioner.getTokenFactory().fromString(jobKeyRange.end_token));
-                }
-                else
-                {
-                    logger.warn("ignoring jobKeyRange specified without 
start_key or start_token");
-                }
-            }
-
-            session = 
CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","),
 conf).connect();
-            Metadata metadata = session.getCluster().getMetadata();
-
-            for (TokenRange range : masterRangeNodes.keySet())
-            {
-                if (jobRange == null)
-                {
-                    // for each tokenRange, pick a live owner and ask it to 
compute bite-sized splits
-                    splitfutures.add(executor.submit(new SplitCallable(range, 
masterRangeNodes.get(range), conf)));
-                }
-                else
-                {
-                    TokenRange jobTokenRange = rangeToTokenRange(metadata, 
jobRange);
-                    if (range.intersects(jobTokenRange))
-                    {
-                        for (TokenRange intersection: 
range.intersectWith(jobTokenRange))
-                        {
-                            // for each tokenRange, pick a live owner and ask 
it to compute bite-sized splits
-                            splitfutures.add(executor.submit(new 
SplitCallable(intersection,  masterRangeNodes.get(range), conf)));
-                        }
-                    }
-                }
-            }
-
-            // wait until we have all the results back
-            for (Future<List<InputSplit>> futureInputSplits : splitfutures)
-            {
-                try
-                {
-                    splits.addAll(futureInputSplits.get());
-                }
-                catch (Exception e)
-                {
-                    throw new IOException("Could not get input splits", e);
-                }
-            }
-        }
-        finally
-        {
-            executor.shutdownNow();
-        }
-
-        assert splits.size() > 0;
-        Collections.shuffle(splits, new Random(System.nanoTime()));
-        return splits;
-    }
-
-    private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
-    {
-        return 
metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
-                
metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
-    }
-
-    /**
-     * Gets a token tokenRange and splits it up according to the suggested
-     * size into input splits that Hadoop can use.
-     */
-    class SplitCallable implements Callable<List<InputSplit>>
-    {
-
-        private final TokenRange tokenRange;
-        private final Set<Host> hosts;
-        private final Configuration conf;
-
-        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration 
conf)
-        {
-            this.tokenRange = tr;
-            this.hosts = hosts;
-            this.conf = conf;
-        }
-
-        public List<InputSplit> call() throws Exception
-        {
-            ArrayList<InputSplit> splits = new ArrayList<>();
-            Map<TokenRange, Long> subSplits;
-            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
-            // turn the sub-ranges into InputSplits
-            String[] endpoints = new String[hosts.size()];
-
-            // hadoop needs hostname, not ip
-            int endpointIndex = 0;
-            for (Host endpoint : hosts)
-                endpoints[endpointIndex++] = 
endpoint.getAddress().getHostName();
-
-            boolean partitionerIsOpp = partitioner instanceof 
OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner;
-
-            for (TokenRange subSplit : subSplits.keySet())
-            {
-                List<TokenRange> ranges = subSplit.unwrap();
-                for (TokenRange subrange : ranges)
-                {
-                    ColumnFamilySplit split =
-                            new ColumnFamilySplit(
-                                    partitionerIsOpp ?
-                                            
subrange.getStart().toString().substring(2) : subrange.getStart().toString(),
-                                    partitionerIsOpp ?
-                                            
subrange.getEnd().toString().substring(2) : subrange.getStart().toString(),
-                                    subSplits.get(subSplit),
-                                    endpoints);
-
-                    logger.debug("adding {}", split);
-                    splits.add(split);
-                }
-            }
-            return splits;
-        }
-    }
-
-    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, 
TokenRange range, Configuration conf) throws IOException
-    {
-        int splitSize = ConfigHelper.getInputSplitSize(conf);
-        try
-        {
-            return describeSplits(keyspace, cfName, range, splitSize);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String 
keyspace)
-    {
-        try (Session session = 
CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","),
 conf).connect())
-        {
-            Map<TokenRange, Set<Host>> map = new HashMap<>();
-            Metadata metadata = session.getCluster().getMetadata();
-            for (TokenRange tokenRange : metadata.getTokenRanges())
-                map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', 
tokenRange));
-            return map;
-        }
-    }
-
-    private Map<TokenRange, Long> describeSplits(String keyspace, String 
table, TokenRange tokenRange, int splitSize)
-    {
-        String query = String.format("SELECT mean_partition_size, 
partitions_count " +
-                                     "FROM %s.%s " +
-                                     "WHERE keyspace_name = ? AND table_name = 
? AND range_start = ? AND range_end = ?",
-                                     SystemKeyspace.NAME,
-                                     SystemKeyspace.SIZE_ESTIMATES);
-
-        ResultSet resultSet = session.execute(query, keyspace, table, 
tokenRange.getStart().toString(), tokenRange.getEnd().toString());
-
-        Row row = resultSet.one();
-        // If we have no data on this split, return the full split i.e., do 
not sub-split
-        // Assume smallest granularity of partition count available from 
CASSANDRA-7688
-        if (row == null)
-        {
-            Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
-            wrappedTokenRange.put(tokenRange, (long) 128);
-            return wrappedTokenRange;
-        }
-
-        long meanPartitionSize = row.getLong("mean_partition_size");
-        long partitionCount = row.getLong("partitions_count");
-
-        int splitCount = (int)((meanPartitionSize * partitionCount) / 
splitSize);
-        List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
-        Map<TokenRange, Long> rangesWithLength = new HashMap<>();
-        for (TokenRange range : splitRanges)
-            rangesWithLength.put(range, partitionCount/splitCount);
-
-        return rangesWithLength;
-    }
-
-    // Old Hadoop API
-    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, 
int numSplits) throws IOException
-    {
-        TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, 
new TaskAttemptID());
-        List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = 
this.getSplits(tac);
-        org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new 
org.apache.hadoop.mapred.InputSplit[newInputSplits.size()];
-        for (int i = 0; i < newInputSplits.size(); i++)
-            oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i);
-        return oldInputSplits;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java 
b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
deleted file mode 100644
index 5282279..0000000
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-
-@Deprecated
-public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
-        implements 
org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
-{
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem 
filesystem, org.apache.hadoop.mapred.JobConf job, String name, 
org.apache.hadoop.util.Progressable progress) throws IOException
-    {
-        return new BulkRecordWriter(job, progress);
-    }
-
-    @Override
-    public BulkRecordWriter getRecordWriter(final TaskAttemptContext context) 
throws IOException, InterruptedException
-    {
-        return new BulkRecordWriter(context);
-    }
-
-
-    @Override
-    public void checkOutputSpecs(JobContext context)
-    {
-        checkOutputSpecs(HadoopCompat.getConfiguration(context));
-    }
-
-    private void checkOutputSpecs(Configuration conf)
-    {
-        if (ConfigHelper.getOutputKeyspace(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace 
with setColumnFamily()");
-        }
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
throws IOException, InterruptedException
-    {
-        return new NullOutputCommitter();
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, 
org.apache.hadoop.mapred.JobConf job) throws IOException
-    {
-        checkOutputSpecs(job);
-    }
-
-    public static class NullOutputCommitter extends OutputCommitter
-    {
-        public void abortTask(TaskAttemptContext taskContext) { }
-
-        public void cleanupJob(JobContext jobContext) { }
-
-        public void commitTask(TaskAttemptContext taskContext) { }
-
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-            return false;
-        }
-
-        public void setupJob(JobContext jobContext) { }
-
-        public void setupTask(TaskAttemptContext taskContext) { }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
deleted file mode 100644
index 99abf9f..0000000
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
-import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.thrift.CounterColumn;
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.cassandra.utils.NativeSSTableLoaderClient;
-import org.apache.cassandra.utils.OutputHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.Progressable;
-
-@Deprecated
-public final class BulkRecordWriter extends RecordWriter<ByteBuffer, 
List<Mutation>>
-        implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer, 
List<Mutation>>
-{
-    public final static String OUTPUT_LOCATION = 
"mapreduce.output.bulkoutputformat.localdir";
-    public final static String BUFFER_SIZE_IN_MB = 
"mapreduce.output.bulkoutputformat.buffersize";
-    public final static String STREAM_THROTTLE_MBITS = 
"mapreduce.output.bulkoutputformat.streamthrottlembits";
-    public final static String MAX_FAILED_HOSTS = 
"mapreduce.output.bulkoutputformat.maxfailedhosts";
-
-    private final Logger logger = 
LoggerFactory.getLogger(BulkRecordWriter.class);
-
-    protected final Configuration conf;
-    protected final int maxFailures;
-    protected final int bufferSize;
-    protected Closeable writer;
-    protected SSTableLoader loader;
-    protected Progressable progress;
-    protected TaskAttemptContext context;
-    private File outputDir;
-    
-    
-    private enum CFType
-    {
-        NORMAL,
-        SUPER,
-    }
-
-    private enum ColType
-    {
-        NORMAL,
-        COUNTER
-    }
-
-    private CFType cfType;
-    private ColType colType;
-
-    BulkRecordWriter(TaskAttemptContext context)
-    {
-
-        this(HadoopCompat.getConfiguration(context));
-        this.context = context;
-    }
-
-    BulkRecordWriter(Configuration conf, Progressable progress)
-    {
-        this(conf);
-        this.progress = progress;
-    }
-
-    BulkRecordWriter(Configuration conf)
-    {
-        Config.setOutboundBindAny(true);
-        this.conf = conf;
-        
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS,
 "0")));
-        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
-        bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
-    }
-
-    protected String getOutputLocation() throws IOException
-    {
-        String dir = conf.get(OUTPUT_LOCATION, 
System.getProperty("java.io.tmpdir"));
-        if (dir == null)
-            throw new IOException("Output directory not defined, if hadoop is 
not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
-        return dir;
-    }
-
-    private void setTypes(Mutation mutation)
-    {
-       if (cfType == null)
-       {
-           if (mutation.getColumn_or_supercolumn().isSetSuper_column() || 
mutation.getColumn_or_supercolumn().isSetCounter_super_column())
-               cfType = CFType.SUPER;
-           else
-               cfType = CFType.NORMAL;
-           if (mutation.getColumn_or_supercolumn().isSetCounter_column() || 
mutation.getColumn_or_supercolumn().isSetCounter_super_column())
-               colType = ColType.COUNTER;
-           else
-               colType = ColType.NORMAL;
-       }
-    }
-
-    private void prepareWriter() throws IOException
-    {
-        if (outputDir == null)
-        {
-            String keyspace = ConfigHelper.getOutputKeyspace(conf);
-            //dir must be named by ks/cf for the loader
-            outputDir = new File(getOutputLocation() + File.separator + 
keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
-            outputDir.mkdirs();
-        }
-        
-        if (writer == null)
-        {
-            AbstractType<?> subcomparator = null;
-
-            if (cfType == CFType.SUPER)
-                subcomparator = BytesType.instance;
-
-            writer = new SSTableSimpleUnsortedWriter(
-                    outputDir,
-                    ConfigHelper.getOutputPartitioner(conf),
-                    ConfigHelper.getOutputKeyspace(conf),
-                    ConfigHelper.getOutputColumnFamily(conf),
-                    BytesType.instance,
-                    subcomparator,
-                    Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")),
-                    ConfigHelper.getOutputCompressionParamaters(conf));
-
-            this.loader = new SSTableLoader(outputDir, new 
ExternalClient(conf), new NullOutputHandler());
-        }
-    }
-
-    @Override
-    public void close(TaskAttemptContext context) throws IOException
-    {
-        close();
-    }
-
-    /** Fills the deprecated RecordWriter interface for streaming. */
-    @Deprecated
-    public void close(org.apache.hadoop.mapred.Reporter reporter) throws 
IOException
-    {
-        close();
-    }
-
-    private void close() throws IOException
-    {
-        if (writer != null)
-        {
-            writer.close();
-            Future<StreamState> future = loader.stream();
-            while (true)
-            {
-                try
-                {
-                    future.get(1000, TimeUnit.MILLISECONDS);
-                    break;
-                }
-                catch (ExecutionException | TimeoutException te)
-                {
-                    if (null != progress)
-                        progress.progress();
-                    if (null != context)
-                        HadoopCompat.progress(context);
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-            if (loader.getFailedHosts().size() > 0)
-            {
-                if (loader.getFailedHosts().size() > maxFailures)
-                    throw new IOException("Too many hosts failed: " + 
loader.getFailedHosts());
-                else
-                    logger.warn("Some hosts failed: {}", 
loader.getFailedHosts());
-            }
-        }
-    }
-
-    @Override
-    public void write(ByteBuffer keybuff, List<Mutation> value) throws 
IOException
-    {
-        setTypes(value.get(0));
-        prepareWriter();
-        SSTableSimpleUnsortedWriter ssWriter = (SSTableSimpleUnsortedWriter) 
writer;
-        ssWriter.newRow(keybuff);
-        for (Mutation mut : value)
-        {
-            if (cfType == CFType.SUPER)
-            {
-                
ssWriter.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
-                if (colType == ColType.COUNTER)
-                    for (CounterColumn column : 
mut.getColumn_or_supercolumn().getCounter_super_column().columns)
-                        ssWriter.addCounterColumn(column.name, column.value);
-                else
-                {
-                    for (Column column : 
mut.getColumn_or_supercolumn().getSuper_column().columns)
-                    {
-                        if(column.ttl == 0)
-                            ssWriter.addColumn(column.name, column.value, 
column.timestamp);
-                        else
-                            ssWriter.addExpiringColumn(column.name, 
column.value, column.timestamp, column.ttl, System.currentTimeMillis() + 
((long)column.ttl * 1000));
-                    }
-                }
-            }
-            else
-            {
-                if (colType == ColType.COUNTER)
-                    
ssWriter.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, 
mut.getColumn_or_supercolumn().counter_column.value);
-                else
-                {
-                    if(mut.getColumn_or_supercolumn().column.ttl == 0)
-                        
ssWriter.addColumn(mut.getColumn_or_supercolumn().column.name, 
mut.getColumn_or_supercolumn().column.value, 
mut.getColumn_or_supercolumn().column.timestamp);
-                    else
-                        
ssWriter.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, 
mut.getColumn_or_supercolumn().column.value, 
mut.getColumn_or_supercolumn().column.timestamp, 
mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + 
((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
-                }
-            }
-            if (null != progress)
-                progress.progress();
-            if (null != context)
-                HadoopCompat.progress(context);
-        }
-    }
-
-    public static class ExternalClient extends NativeSSTableLoaderClient
-    {
-        public ExternalClient(Configuration conf)
-        {
-            super(resolveHostAddresses(conf),
-                  CqlConfigHelper.getOutputNativePort(conf),
-                  ConfigHelper.getOutputKeyspaceUserName(conf),
-                  ConfigHelper.getOutputKeyspacePassword(conf),
-                  CqlConfigHelper.getSSLOptions(conf).orNull());
-        }
-
-        private static Collection<InetAddress> 
resolveHostAddresses(Configuration conf)
-        {
-            Set<InetAddress> addresses = new HashSet<>();
-
-            for (String host : 
ConfigHelper.getOutputInitialAddress(conf).split(","))
-            {
-                try
-                {
-                    addresses.add(InetAddress.getByName(host));
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            return addresses;
-        }
-    }
-
-    public static class NullOutputHandler implements OutputHandler
-    {
-        public void output(String msg) {}
-        public void debug(String msg) {}
-        public void warn(String msg) {}
-        public void warn(String msg, Throwable th) {}
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java 
b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
deleted file mode 100644
index 4662fa5..0000000
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.thrift.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-/**
- * Hadoop InputFormat allowing map/reduce against Cassandra rows within one 
ColumnFamily.
- *
- * At minimum, you need to set the CF and predicate (description of columns to 
extract from each row)
- * in your Hadoop job Configuration.  The ConfigHelper class is provided to 
make this
- * simple:
- *   ConfigHelper.setInputColumnFamily
- *   ConfigHelper.setInputSlicePredicate
- *
- * You can also configure the number of rows per InputSplit with
- *   ConfigHelper.setInputSplitSize
- * This should be "as big as possible, but no bigger."  Each InputSplit is 
read from Cassandra
- * with multiple get_slice_range queries, and the per-call overhead of 
get_slice_range is high,
- * so larger split sizes are better -- but if it is too large, you will run 
out of memory.
- *
- * The default split size is 64k rows.
- */
-@Deprecated
-public class ColumnFamilyInputFormat extends 
AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, 
ColumnFamilyRecordReader.Column>>
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
-
-    @SuppressWarnings("resource")
-    public static Cassandra.Client createAuthenticatedClient(String location, 
int port, Configuration conf) throws Exception
-    {
-        logger.debug("Creating authenticated client for CF input format");
-        TTransport transport;
-        try
-        {
-            transport = 
ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
-        }
-        catch (Exception e)
-        {
-            throw new TTransportException("Failed to open a transport to " + 
location + ":" + port + ".", e);
-        }
-        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-
-        // log in
-        client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
-        if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && 
(ConfigHelper.getInputKeyspacePassword(conf) != null))
-        {
-            Map<String, String> creds = new HashMap<String, String>();
-            creds.put(PasswordAuthenticator.USERNAME_KEY, 
ConfigHelper.getInputKeyspaceUserName(conf));
-            creds.put(PasswordAuthenticator.PASSWORD_KEY, 
ConfigHelper.getInputKeyspacePassword(conf));
-            AuthenticationRequest authRequest = new 
AuthenticationRequest(creds);
-            client.login(authRequest);
-        }
-        logger.debug("Authenticated client for CF input format created 
successfully");
-        return client;
-    }
-
-    public RecordReader<ByteBuffer, SortedMap<ByteBuffer, 
ColumnFamilyRecordReader.Column>> createRecordReader(InputSplit inputSplit, 
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
-    {
-        return new ColumnFamilyRecordReader();
-    }
-
-    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, 
SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> 
getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, 
final Reporter reporter) throws IOException
-    {
-        TaskAttemptContext tac = HadoopCompat.newMapContext(
-                jobConf,
-                TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)),
-                null,
-                null,
-                null,
-                new ReporterWrapper(reporter),
-                null);
-
-        ColumnFamilyRecordReader recordReader = new 
ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, 
CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT));
-        recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, 
tac);
-        return recordReader;
-    }
-    
-    @Override
-    protected void validateConfiguration(Configuration conf)
-    {
-        super.validateConfiguration(conf);
-        
-        if (ConfigHelper.getInputSlicePredicate(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the 
predicate with setInputSlicePredicate");
-        }
-    }
-
-}

Reply via email to