KAFKA-2783; Drop outdated hadoop contrib modules

Author: Grant Henke <granthe...@gmail.com>

Reviewers: Gwen Shapira

Closes #466 from granthenke/drop-contrib

(cherry picked from commit 69af573b35f04657e31f60e636aba19ffa0b2c84)
Signed-off-by: Gwen Shapira <csh...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e176fcc7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e176fcc7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e176fcc7

Branch: refs/heads/0.9.0
Commit: e176fcc7fb146bf9be2d8d8f2a4f5e02f4753731
Parents: f22ea29
Author: Grant Henke <granthe...@gmail.com>
Authored: Mon Nov 9 11:02:46 2015 -0800
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Mon Nov 9 11:03:07 2015 -0800

----------------------------------------------------------------------
 README.md                                       |   2 +-
 build.gradle                                    |  51 +--
 contrib/LICENSE                                 |   1 -
 contrib/NOTICE                                  |   1 -
 contrib/hadoop-consumer/README                  |  66 ---
 contrib/hadoop-consumer/copy-jars.sh            |  69 ---
 contrib/hadoop-consumer/hadoop-setup.sh         |  20 -
 contrib/hadoop-consumer/run-class.sh            |  65 ---
 .../main/java/kafka/etl/KafkaETLContext.java    | 270 -----------
 .../java/kafka/etl/KafkaETLInputFormat.java     |  78 ----
 .../src/main/java/kafka/etl/KafkaETLJob.java    | 172 -------
 .../src/main/java/kafka/etl/KafkaETLKey.java    | 104 -----
 .../java/kafka/etl/KafkaETLRecordReader.java    | 180 --------
 .../main/java/kafka/etl/KafkaETLRequest.java    | 129 ------
 .../src/main/java/kafka/etl/KafkaETLUtils.java  | 205 ---------
 .../src/main/java/kafka/etl/Props.java          | 458 -------------------
 .../kafka/etl/UndefinedPropertyException.java   |  28 --
 .../main/java/kafka/etl/impl/DataGenerator.java | 134 ------
 .../java/kafka/etl/impl/SimpleKafkaETLJob.java  | 104 -----
 .../kafka/etl/impl/SimpleKafkaETLMapper.java    |  91 ----
 contrib/hadoop-consumer/test/test.properties    |  42 --
 contrib/hadoop-producer/README.md               |  94 ----
 .../kafka/bridge/examples/TextPublisher.java    |  66 ---
 .../kafka/bridge/hadoop/KafkaOutputFormat.java  | 144 ------
 .../kafka/bridge/hadoop/KafkaRecordWriter.java  |  88 ----
 .../java/kafka/bridge/pig/AvroKafkaStorage.java | 115 -----
 docs/api.html                                   |  20 +-
 docs/documentation.html                         |   2 +-
 docs/implementation.html                        |  88 ++--
 settings.gradle                                 |   4 +-
 30 files changed, 54 insertions(+), 2837 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index dc15923..6baa17e 100644
--- a/README.md
+++ b/README.md
@@ -60,7 +60,7 @@ The release file can be found inside 
./core/build/distributions/.
     ./gradlew -PscalaVersion=2.11.7 releaseTarGz
 
 ### Running a task for a specific project ###
-This is for 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 
'examples' and 'clients'
+This is for 'core', 'examples' and 'clients'
     ./gradlew core:jar
     ./gradlew core:test
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4ea0ee3..f9fd42a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -229,7 +229,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
 }
 
 def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 
'connect:file']
-def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 
'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + connectPkgs
+def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + 
connectPkgs
 
 tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" 
}) {}
 tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] 
+ pkgs.collect { it + ":jar" }) { }
@@ -376,55 +376,6 @@ project(':core') {
   }
 }
 
-project(':contrib:hadoop-consumer') {
-  archivesBaseName = "kafka-hadoop-consumer"
-
-  dependencies {
-    compile project(':core')
-    compile "org.apache.avro:avro:1.4.0"
-    compile "org.apache.pig:pig:0.8.0"
-    compile "commons-logging:commons-logging:1.0.4"
-    compile "org.codehaus.jackson:jackson-core-asl:1.5.5"
-    compile "org.codehaus.jackson:jackson-mapper-asl:1.5.5"
-    compile "org.apache.hadoop:hadoop-core:0.20.2"
-  }
-
-  configurations {
-    // manually excludes some unnecessary dependencies
-    compile.exclude module: 'javax'
-    compile.exclude module: 'jms'
-    compile.exclude module: 'jmxri'
-    compile.exclude module: 'jmxtools'
-    compile.exclude module: 'mail'
-    compile.exclude module: 'netty'
-  }
-}
-
-project(':contrib:hadoop-producer') {
-  archivesBaseName = "kafka-hadoop-producer"
-
-  dependencies {
-    compile project(':core')
-    compile("org.apache.avro:avro:1.4.0") { force = true }
-    compile "org.apache.pig:pig:0.8.0"
-    compile "commons-logging:commons-logging:1.0.4"
-    compile "org.codehaus.jackson:jackson-core-asl:1.5.5"
-    compile "org.codehaus.jackson:jackson-mapper-asl:1.5.5"
-    compile "org.apache.hadoop:hadoop-core:0.20.2"
-    compile "org.apache.pig:piggybank:0.12.0"
-  }
-
-  configurations {
-    // manually excludes some unnecessary dependencies
-    compile.exclude module: 'javax'
-    compile.exclude module: 'jms'
-    compile.exclude module: 'jmxri'
-    compile.exclude module: 'jmxtools'
-    compile.exclude module: 'mail'
-    compile.exclude module: 'netty'
-  }
-}
-
 project(':examples') {
   archivesBaseName = "kafka-examples"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/LICENSE
----------------------------------------------------------------------
diff --git a/contrib/LICENSE b/contrib/LICENSE
deleted file mode 120000
index ea5b606..0000000
--- a/contrib/LICENSE
+++ /dev/null
@@ -1 +0,0 @@
-../LICENSE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/NOTICE
----------------------------------------------------------------------
diff --git a/contrib/NOTICE b/contrib/NOTICE
deleted file mode 120000
index 7e1b82f..0000000
--- a/contrib/NOTICE
+++ /dev/null
@@ -1 +0,0 @@
-../NOTICE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/README
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/README b/contrib/hadoop-consumer/README
deleted file mode 100644
index 54a2666..0000000
--- a/contrib/hadoop-consumer/README
+++ /dev/null
@@ -1,66 +0,0 @@
-This is a Hadoop job that pulls data from kafka server into HDFS.
-
-It requires the following inputs from a configuration file 
-(test/test.properties is an example)
-
-kafka.etl.topic : the topic to be fetched;
-
-input          : input directory containing topic offsets and
-                 it can be generated by DataGenerator; 
-                 the number of files in this directory determines the
-                 number of mappers in the hadoop job;
-
-output         : output directory containing kafka data and updated 
-                 topic offsets;
-
-kafka.request.limit : it is used to limit the number events fetched. 
-
-KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat.
-It fetches kafka data from the server. It starts from provided offsets 
-(specified by "input") and stops when it reaches the largest available offsets 
-or the specified limit (specified by "kafka.request.limit").
-
-KafkaETLJob contains some helper functions to initialize job configuration.
-
-SimpleKafkaETLJob sets up job properties and files Hadoop job. 
-
-SimpleKafkaETLMapper dumps kafka data into hdfs. 
-
-HOW TO RUN:
-In order to run this, make sure the HADOOP_HOME environment variable points to 
-your hadoop installation directory.
-
-1. Compile using "sbt" to create a package for hadoop consumer code.
-./sbt package
-
-2. Run the hadoop-setup.sh script that enables write permission on the 
-   required HDFS directory
-
-3. Produce test events in server and generate offset files
-  1) Start kafka server [ Follow the quick start - 
-                        http://sna-projects.com/kafka/quickstart.php ]
-
-  2) Update test/test.properties to change the following parameters:  
-   kafka.etl.topic     : topic name
-   event.count         : number of events to be generated
-   kafka.server.uri     : kafka server uri;
-   input                : hdfs directory of offset files
-
-  3) Produce test events to Kafka server and generate offset files
-   ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
-
-4. Fetch generated topic into HDFS:
-  1) Update test/test.properties to change the following parameters:
-       hadoop.job.ugi  : id and group 
-       input           : input location 
-       output          : output location 
-       kafka.request.limit: limit the number of events to be fetched; 
-                            -1 means no limitation.
-        hdfs.default.classpath.dir : hdfs location of jars
-
-  2) copy jars into hdfs
-   ./copy-jars.sh ${hdfs.default.classpath.dir}
-
-  2) Fetch data
-  ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/copy-jars.sh
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/copy-jars.sh 
b/contrib/hadoop-consumer/copy-jars.sh
deleted file mode 100755
index e5de1dd..0000000
--- a/contrib/hadoop-consumer/copy-jars.sh
+++ /dev/null
@@ -1,69 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ $# -lt 1 ];
-then
-  echo "USAGE: $0 dir"
-  exit 1
-fi
-
-base_dir=$(dirname $0)/../..
-
-hadoop=${HADOOP_HOME}/bin/hadoop
-
-echo "$hadoop fs -rmr $1"
-$hadoop fs -rmr $1
-
-echo "$hadoop fs -mkdir $1"
-$hadoop fs -mkdir $1
-
-# include kafka jars
-for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
-do
-   echo "$hadoop fs -put $file $1/"
-   $hadoop fs -put $file $1/ 
-done
-
-# include kafka jars
-echo "$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar; $1/"
-$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar $1/ 
-
-# include core lib jars
-for file in $base_dir/core/lib/*.jar;
-do
-   echo "$hadoop fs -put $file $1/"
-   $hadoop fs -put $file $1/ 
-done
-
-for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
-do
-   echo "$hadoop fs -put $file $1/"
-   $hadoop fs -put $file $1/ 
-done
-
-# include scala library jar
-echo "$hadoop fs -put 
$base_dir/project/boot/scala-2.8.0/lib/scala-library.jar; $1/"
-$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar $1/
-
-local_dir=$(dirname $0)
-
-# include hadoop-consumer jars
-for file in $local_dir/lib/*.jar;
-do
-   echo "$hadoop fs -put $file $1/"
-   $hadoop fs -put $file $1/ 
-done
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/hadoop-setup.sh
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/hadoop-setup.sh 
b/contrib/hadoop-consumer/hadoop-setup.sh
deleted file mode 100755
index c855e66..0000000
--- a/contrib/hadoop-consumer/hadoop-setup.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-hadoop=${HADOOP_HOME}/bin/hadoop
-
-$hadoop fs -chmod ugoa+w /tmp
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/run-class.sh
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/run-class.sh 
b/contrib/hadoop-consumer/run-class.sh
deleted file mode 100755
index bfb4744..0000000
--- a/contrib/hadoop-consumer/run-class.sh
+++ /dev/null
@@ -1,65 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ $# -lt 1 ];
-then
-  echo "USAGE: $0 classname [opts]"
-  exit 1
-fi
-
-base_dir=$(dirname $0)/../..
-
-# include kafka jars
-for file in $base_dir/core/target/scala_2.8.0/kafka-*.jar
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-for file in 
$base_dir/contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-local_dir=$(dirname $0)
-
-# include hadoop-consumer jars
-for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-for file in $base_dir/contrib/hadoop-consumer/lib/*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-CLASSPATH=$CLASSPATH:$base_dir/project/boot/scala-2.8.0/lib/scala-library.jar
-
-echo $CLASSPATH
-
-CLASSPATH=dist:$CLASSPATH:${HADOOP_HOME}/conf
-
-#if [ -z "$KAFKA_OPTS" ]; then
-#  KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote"
-#fi
-
-if [ -z "$JAVA_HOME" ]; then
-  JAVA="java"
-else
-  JAVA="$JAVA_HOME/bin/java"
-fi
-
-$JAVA $KAFKA_OPTS -cp $CLASSPATH $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
deleted file mode 100644
index c9b9018..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
-@SuppressWarnings({ "deprecation"})
-public class KafkaETLContext {
-    
-    static protected int MAX_RETRY_TIME = 1;
-    final static String CLIENT_BUFFER_SIZE = "client.buffer.size";
-    final static String CLIENT_TIMEOUT = "client.so.timeout";
-
-    final static int DEFAULT_BUFFER_SIZE = 1 * 1024 * 1024;
-    final static int DEFAULT_TIMEOUT = 60000; // one minute
-
-    final static KafkaETLKey DUMMY_KEY = new KafkaETLKey();
-
-    protected int _index; /*index of context*/
-    protected String _input = null; /*input string*/
-    protected KafkaETLRequest _request = null;
-    protected SimpleConsumer _consumer = null; /*simple consumer*/
-
-    protected long[] _offsetRange = {0, 0};  /*offset range*/
-    protected long _offset = Long.MAX_VALUE; /*current offset*/
-    protected long _count; /*current count*/
-
-    protected FetchResponse _response = null;  /*fetch response*/
-    protected Iterator<MessageAndOffset> _messageIt = null; /*message 
iterator*/
-    protected Iterator<ByteBufferMessageSet> _respIterator = null;
-    protected int _retry = 0;
-    protected long _requestTime = 0; /*accumulative request time*/
-    protected long _startTime = -1;
-    
-    protected int _bufferSize;
-    protected int _timeout;
-    protected Reporter _reporter;
-    
-    protected MultipleOutputs _mos;
-    protected OutputCollector<KafkaETLKey, BytesWritable> _offsetOut = null;
-    protected FetchRequestBuilder builder = new FetchRequestBuilder();
-    
-    public long getTotalBytes() {
-        return (_offsetRange[1] > _offsetRange[0])? _offsetRange[1] - 
_offsetRange[0] : 0;
-    }
-    
-    public long getReadBytes() {
-        return _offset - _offsetRange[0];
-    }
-    
-    public long getCount() {
-        return _count;
-    }
-    
-    /**
-     * construct using input string
-     */
-    @SuppressWarnings("unchecked")
-    public KafkaETLContext(JobConf job, Props props, Reporter reporter, 
-                                    MultipleOutputs mos, int index, String 
input) 
-    throws Exception {
-        
-        _bufferSize = getClientBufferSize(props);
-        _timeout = getClientTimeout(props);
-        System.out.println("bufferSize=" +_bufferSize);
-        System.out.println("timeout=" + _timeout);
-        _reporter = reporter;
-        _mos = mos;
-        
-        // read topic and current offset from input
-        _index= index; 
-        _input = input;
-        _request = new KafkaETLRequest(input.trim());
-        
-        // read data from queue
-        URI uri = _request.getURI();
-        _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, 
_bufferSize, "KafkaETLContext");
-        
-        // get available offset range
-        _offsetRange = getOffsetRange();
-        System.out.println("Connected to node " + uri 
-                + " beginning reading at offset " + _offsetRange[0]
-                + " latest offset=" + _offsetRange[1]);
-
-        _offset = _offsetRange[0];
-        _count = 0;
-        _requestTime = 0;
-        _retry = 0;
-        
-        _startTime = System.currentTimeMillis();
-    }
-    
-    public boolean hasMore () {
-        return _messageIt != null && _messageIt.hasNext() 
-                || _response != null && _respIterator.hasNext()
-                || _offset < _offsetRange[1]; 
-    }
-    
-    public boolean getNext(KafkaETLKey key, BytesWritable value) throws 
IOException {
-        if ( !hasMore() ) return false;
-        
-        boolean gotNext = get(key, value);
-
-        if(_response != null) {
-
-            while ( !gotNext && _respIterator.hasNext()) {
-                ByteBufferMessageSet msgSet = _respIterator.next();
-                _messageIt = msgSet.iterator();
-                gotNext = get(key, value);
-            }
-        }
-        return gotNext;
-    }
-    
-    public boolean fetchMore () throws IOException {
-        if (!hasMore()) return false;
-
-        FetchRequest fetchRequest = builder
-                .clientId(_request.clientId())
-                .addFetch(_request.getTopic(), _request.getPartition(), 
_offset, _bufferSize)
-                .build();
-
-        long tempTime = System.currentTimeMillis();
-        _response = _consumer.fetch(fetchRequest);
-        if(_response != null) {
-            _respIterator = new ArrayList<ByteBufferMessageSet>(){{
-                add(_response.messageSet(_request.getTopic(), 
_request.getPartition()));
-            }}.iterator();
-        }
-        _requestTime += (System.currentTimeMillis() - tempTime);
-        
-        return true;
-    }
-    
-    @SuppressWarnings("unchecked")
-    public void output(String fileprefix) throws IOException {
-       String offsetString = _request.toString(_offset);
-
-        if (_offsetOut == null)
-            _offsetOut = (OutputCollector<KafkaETLKey, BytesWritable>)
-                                    _mos.getCollector("offsets", 
fileprefix+_index, _reporter);
-        _offsetOut.collect(DUMMY_KEY, new 
BytesWritable(offsetString.getBytes("UTF-8")));
-        
-    }
-    
-    public void close() throws IOException {
-        if (_consumer != null) _consumer.close();
-        
-        String topic = _request.getTopic();
-        long endTime = System.currentTimeMillis();
-        _reporter.incrCounter(topic, "read-time(ms)", endTime - _startTime);
-        _reporter.incrCounter(topic, "request-time(ms)", _requestTime);
-        
-        long bytesRead = _offset - _offsetRange[0];
-        double megaRead = bytesRead / (1024.0*1024.0);
-        _reporter.incrCounter(topic, "data-read(mb)", (long) megaRead);
-        _reporter.incrCounter(topic, "event-count", _count);
-    }
-    
-    protected boolean get(KafkaETLKey key, BytesWritable value) throws 
IOException {
-        if (_messageIt != null && _messageIt.hasNext()) {
-            MessageAndOffset messageAndOffset = _messageIt.next();
-            
-            ByteBuffer buf = messageAndOffset.message().buffer();
-            int origSize = buf.remaining();
-            byte[] bytes = new byte[origSize];
-          buf.get(bytes, buf.position(), origSize);
-            value.set(bytes, 0, origSize);
-            
-            key.set(_index, _offset, messageAndOffset.message().checksum());
-            
-            _offset = messageAndOffset.nextOffset();  //increase offset
-            _count ++;  //increase count
-            
-            return true;
-        }
-        else return false;
-    }
-    
-    /**
-     * Get offset ranges
-     */
-    protected long[] getOffsetRange() throws IOException {
-
-        /* get smallest and largest offsets*/
-        long[] range = new long[2];
-
-        TopicAndPartition topicAndPartition = new 
TopicAndPartition(_request.getTopic(), _request.getPartition());
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
-                new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
-        OffsetRequest request = new OffsetRequest(
-            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), 
kafka.api.OffsetRequest.DefaultClientId());
-        long[] startOffsets = 
_consumer.getOffsetsBefore(request).offsets(_request.getTopic(), 
_request.getPartition());
-        if (startOffsets.length != 1)
-            throw new IOException("input:" + _input + " Expect one smallest 
offset but get "
-                                            + startOffsets.length);
-        range[0] = startOffsets[0];
-        
-        requestInfo.clear();
-        requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
-        request = new OffsetRequest(
-            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), 
kafka.api.OffsetRequest.DefaultClientId());
-        long[] endOffsets = 
_consumer.getOffsetsBefore(request).offsets(_request.getTopic(), 
_request.getPartition());
-        if (endOffsets.length != 1)
-            throw new IOException("input:" + _input + " Expect one latest 
offset but get " 
-                                            + endOffsets.length);
-        range[1] = endOffsets[0];
-
-        /*adjust range based on input offsets*/
-        if ( _request.isValidOffset()) {
-            long startOffset = _request.getOffset();
-            if (startOffset > range[0]) {
-                System.out.println("Update starting offset with " + 
startOffset);
-                range[0] = startOffset;
-            }
-            else {
-                System.out.println("WARNING: given starting offset " + 
startOffset 
-                                            + " is smaller than the smallest 
one " + range[0] 
-                                            + ". Will ignore it.");
-            }
-        }
-        System.out.println("Using offset range [" + range[0] + ", " + range[1] 
+ "]");
-        return range;
-    }
-    
-    public static int getClientBufferSize(Props props) throws Exception {
-        return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
-    }
-
-    public static int getClientTimeout(Props props) throws Exception {
-        return props.getInt(CLIENT_TIMEOUT, DEFAULT_TIMEOUT);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
deleted file mode 100644
index ddd6b72..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Map;
-import kafka.consumer.SimpleConsumer;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
-
-@SuppressWarnings("deprecation")
-public class KafkaETLInputFormat 
-extends SequenceFileInputFormat<KafkaETLKey, BytesWritable> {
-
-    protected Props _props;
-    protected int _bufferSize;
-    protected int _soTimeout;
-
-    protected Map<Integer, URI> _nodes;
-    protected int _partition;
-    protected int _nodeId;
-    protected String _topic;
-    protected SimpleConsumer _consumer;
-
-    protected MultipleOutputs _mos;
-    protected OutputCollector<BytesWritable, BytesWritable> _offsetOut = null;
-
-    protected long[] _offsetRange;
-    protected long _startOffset;
-    protected long _offset;
-    protected boolean _toContinue = true;
-    protected int _retry;
-    protected long _timestamp;
-    protected long _count;
-    protected boolean _ignoreErrors = false;
-
-    @Override
-    public RecordReader<KafkaETLKey, BytesWritable> getRecordReader(InputSplit 
split,
-                                    JobConf job, Reporter reporter)
-                                    throws IOException {
-        return new KafkaETLRecordReader(split, job, reporter);
-    }
-
-    @Override
-    protected boolean isSplitable(FileSystem fs, Path file) {
-        return super.isSplitable(fs, file);
-    }
-
-    @Override
-    public InputSplit[] getSplits(JobConf conf, int numSplits) throws 
IOException {
-        return super.getSplits(conf, numSplits);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
deleted file mode 100644
index 1a4bcba..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-
-import java.net.URI;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
-@SuppressWarnings("deprecation")
-public class KafkaETLJob {
-    
-    public static final String HADOOP_PREFIX = "hadoop-conf.";
-    /**
-     * Create a job configuration
-     */
-    @SuppressWarnings("rawtypes")
-    public static JobConf createJobConf(String name, String topic, Props 
props, Class classobj) 
-    throws Exception {
-        JobConf conf = getJobConf(name, props, classobj);
-        
-        conf.set("topic", topic);
-        
-        // input format
-        conf.setInputFormat(KafkaETLInputFormat.class);
-
-        //turn off mapper speculative execution
-        conf.setMapSpeculativeExecution(false);
-        
-        // setup multiple outputs
-        MultipleOutputs.addMultiNamedOutput(conf, "offsets", 
SequenceFileOutputFormat.class, 
-                    KafkaETLKey.class, BytesWritable.class);
-
-
-        return conf;
-    }
-    
-    /**
-     * Helper function to initialize a job configuration
-     */
-    public static JobConf getJobConf(String name, Props props, Class classobj) 
throws Exception {
-        JobConf conf = new JobConf();
-        // set custom class loader with custom find resource strategy.
-
-        conf.setJobName(name);
-        String hadoop_ugi = props.getProperty("hadoop.job.ugi", null);
-        if (hadoop_ugi != null) {
-            conf.set("hadoop.job.ugi", hadoop_ugi);
-        }
-
-        if (props.getBoolean("is.local", false)) {
-            conf.set("mapred.job.tracker", "local");
-            conf.set("fs.default.name", "file:///");
-            conf.set("mapred.local.dir", "/tmp/map-red");
-
-            info("Running locally, no hadoop jar set.");
-        } else {
-            setClassLoaderAndJar(conf, classobj);
-            info("Setting hadoop jar file for class:" + classobj + "  to " + 
conf.getJar());
-            
info("*************************************************************************");
-            info("          Running on Real Hadoop Cluster(" + 
conf.get("mapred.job.tracker") + ")           ");
-            
info("*************************************************************************");
-        }
-
-        // set JVM options if present
-        if (props.containsKey("mapred.child.java.opts")) {
-            conf.set("mapred.child.java.opts", 
props.getProperty("mapred.child.java.opts"));
-            info("mapred.child.java.opts set to " + 
props.getProperty("mapred.child.java.opts"));
-        }
-
-        // Adds External jars to hadoop classpath
-        String externalJarList = props.getProperty("hadoop.external.jarFiles", 
null);
-        if (externalJarList != null) {
-            String[] jarFiles = externalJarList.split(",");
-            for (String jarFile : jarFiles) {
-                info("Adding extenral jar File:" + jarFile);
-                DistributedCache.addFileToClassPath(new Path(jarFile), conf);
-            }
-        }
-
-        // Adds distributed cache files
-        String cacheFileList = props.getProperty("hadoop.cache.files", null);
-        if (cacheFileList != null) {
-            String[] cacheFiles = cacheFileList.split(",");
-            for (String cacheFile : cacheFiles) {
-                info("Adding Distributed Cache File:" + cacheFile);
-                DistributedCache.addCacheFile(new URI(cacheFile), conf);
-            }
-        }
-
-        // Adds distributed cache files
-        String archiveFileList = props.getProperty("hadoop.cache.archives", 
null);
-        if (archiveFileList != null) {
-            String[] archiveFiles = archiveFileList.split(",");
-            for (String archiveFile : archiveFiles) {
-                info("Adding Distributed Cache Archive File:" + archiveFile);
-                DistributedCache.addCacheArchive(new URI(archiveFile), conf);
-            }
-        }
-
-        String hadoopCacheJarDir = 
props.getProperty("hdfs.default.classpath.dir", null);
-        if (hadoopCacheJarDir != null) {
-            FileSystem fs = FileSystem.get(conf);
-            if (fs != null) {
-                FileStatus[] status = fs.listStatus(new 
Path(hadoopCacheJarDir));
-
-                if (status != null) {
-                    for (int i = 0; i < status.length; ++i) {
-                        if (!status[i].isDir()) {
-                            Path path = new Path(hadoopCacheJarDir, 
status[i].getPath().getName());
-                            info("Adding Jar to Distributed Cache Archive 
File:" + path);
-
-                            DistributedCache.addFileToClassPath(path, conf);
-                        }
-                    }
-                } else {
-                    info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " 
is empty.");
-                }
-            } else {
-                info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " 
filesystem doesn't exist");
-            }
-        }
-
-        // May want to add this to HadoopUtils, but will await refactoring
-        for (String key : props.stringPropertyNames()) {
-            String lowerCase = key.toLowerCase();
-            if (lowerCase.startsWith(HADOOP_PREFIX)) {
-                String newKey = key.substring(HADOOP_PREFIX.length());
-                conf.set(newKey, props.getProperty(key));
-            }
-        }
-
-        KafkaETLUtils.setPropsInJob(conf, props);
-        
-        return conf;
-    }
-
-    public static void info(String message) {
-        System.out.println(message);
-    }
-
-    public static void setClassLoaderAndJar(JobConf conf,
-            @SuppressWarnings("rawtypes") Class jobClass) {
-        conf.setClassLoader(Thread.currentThread().getContextClassLoader());
-        String jar = KafkaETLUtils.findContainingJar(jobClass, Thread
-                .currentThread().getContextClassLoader());
-        if (jar != null) {
-            conf.setJar(jar);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
deleted file mode 100644
index aafecea..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.io.WritableComparable;
-
-public class KafkaETLKey implements WritableComparable<KafkaETLKey>{
-
-    protected int _inputIndex;
-    protected long _offset;
-    protected long _checksum;
-    
-    /**
-     * dummy empty constructor
-     */
-    public KafkaETLKey() {
-        _inputIndex = 0;
-        _offset = 0;
-        _checksum = 0;
-    }
-    
-    public KafkaETLKey (int index, long offset) {
-        _inputIndex =  index;
-        _offset = offset;
-        _checksum = 0;
-    }
-    
-    public KafkaETLKey (int index, long offset, long checksum) {
-        _inputIndex =  index;
-        _offset = offset;
-        _checksum = checksum;
-    }
-    
-    public void set(int index, long offset, long checksum) {
-        _inputIndex = index;
-        _offset = offset;
-        _checksum = checksum;
-    }
-    
-    public int getIndex() {
-        return _inputIndex;
-    }
-    
-    public long getOffset() {
-        return _offset;
-    }
-    
-    public long getChecksum() {
-        return _checksum;
-    }
-    
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        _inputIndex = in.readInt(); 
-        _offset = in.readLong();
-        _checksum = in.readLong();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(_inputIndex);
-        out.writeLong(_offset);
-        out.writeLong(_checksum);
-    }
-
-    @Override
-    public int compareTo(KafkaETLKey o) {
-        if (_inputIndex != o._inputIndex)
-            return _inputIndex = o._inputIndex;
-        else {
-            if  (_offset > o._offset) return 1;
-            else if (_offset < o._offset) return -1;
-            else {
-                if  (_checksum > o._checksum) return 1;
-                else if (_checksum < o._checksum) return -1;
-                else return 0;
-            }
-        }
-    }
-    
-    @Override
-    public String toString() {
-        return "index=" + _inputIndex + " offset=" + _offset + " checksum=" + 
_checksum;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
deleted file mode 100644
index f040fbe..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.common.KafkaException;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
-@SuppressWarnings({ "deprecation" })
-public class KafkaETLRecordReader 
-extends SequenceFileRecordReader<KafkaETLKey, BytesWritable> {
-
-    /* max number of retries */
-    protected Props _props;   /*properties*/
-    protected JobConf _job;
-    protected Reporter _reporter ;
-    protected MultipleOutputs _mos;
-    protected List<KafkaETLContext> _contextList;
-    protected int _contextIndex ;
-    
-    protected long _totalBytes;
-    protected long _readBytes;
-    protected long _readCounts;
-    
-    protected String _attemptId = null;
-    
-    private static long _limit = 100; /*for testing only*/
-    
-    public KafkaETLRecordReader(InputSplit split, JobConf job, Reporter 
reporter) 
-    throws IOException {
-       super(job, (FileSplit) split);
-       
-       _props = KafkaETLUtils.getPropsFromJob(job);
-       _contextList = new ArrayList<KafkaETLContext>();
-       _job = job;
-       _reporter = reporter;
-       _contextIndex = -1;
-       _mos = new MultipleOutputs(job);
-       try {
-           _limit = _props.getInt("kafka.request.limit", -1);
-           
-           /*get attemp id*/
-           String taskId = _job.get("mapred.task.id");
-           if (taskId == null) {
-               throw new KafkaException("Configuration does not contain the 
property mapred.task.id");
-           }
-           String[] parts = taskId.split("_");
-           if (    parts.length != 6 || !parts[0].equals("attempt") 
-                || (!"m".equals(parts[3]) && !"r".equals(parts[3]))) {
-                   throw new KafkaException("TaskAttemptId string : " + taskId 
+ " is not properly formed");
-           }
-          _attemptId = parts[4]+parts[3];
-       }catch (Exception e) {
-           throw new IOException (e);
-       }
-    }
-
-    @Override
-    public synchronized void close() throws IOException {
-        super.close();
-        
-        /* now record some stats */
-        for (KafkaETLContext context: _contextList) {
-            context.output(_attemptId);
-            context.close();
-        }
-        
-        _mos.close();
-    }
-
-    @Override
-    public KafkaETLKey createKey() {
-        return super.createKey();
-    }
-
-    @Override
-    public BytesWritable createValue() {
-        return super.createValue();
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-        if (_totalBytes == 0) return 0f;
-        
-        if (_contextIndex >= _contextList.size()) return 1f;
-        
-        if (_limit < 0) {
-            double p = ( _readBytes + getContext().getReadBytes() ) / 
((double) _totalBytes);
-            return (float)p;
-        }
-        else {
-            double p = (_readCounts + getContext().getCount()) / 
((double)_limit * _contextList.size());
-            return (float)p;
-        }
-    }
-
-    @Override
-    public synchronized boolean next(KafkaETLKey key, BytesWritable value)
-                                    throws IOException {
-    try{
-        if (_contextIndex < 0) { /* first call, get all requests */
-            System.out.println("RecordReader.next init()");
-            _totalBytes = 0;
-            
-            while ( super.next(key, value)) {
-                String input = new String(value.getBytes(), "UTF-8");
-                int index = _contextList.size();
-                KafkaETLContext context = new KafkaETLContext(
-                                              _job, _props, _reporter, _mos, 
index, input);
-                _contextList.add(context);
-                _totalBytes += context.getTotalBytes();
-            }
-            System.out.println("Number of requests=" + _contextList.size());
-            
-            _readBytes = 0;
-            _readCounts = 0;
-            _contextIndex = 0;
-        }
-        
-        while (_contextIndex < _contextList.size()) {
-            
-            KafkaETLContext currContext = getContext();
-            
-            while (currContext.hasMore() && 
-                       (_limit < 0 || currContext.getCount() < _limit)) {
-                
-                if (currContext.getNext(key, value)) {
-                    //System.out.println("RecordReader.next get (key,value)");
-                    return true;
-                }
-                else {
-                    //System.out.println("RecordReader.next fetch more");
-                    currContext.fetchMore();
-                }
-            }
-            
-            _readBytes += currContext.getReadBytes();
-            _readCounts += currContext.getCount();
-            _contextIndex ++;
-            System.out.println("RecordReader.next will get from request " + 
_contextIndex);
-       }
-    }catch (Exception e) {
-        throw new IOException (e);
-    }
-    return false;
-    }
-    
-    protected KafkaETLContext getContext() throws IOException{
-        if (_contextIndex >= _contextList.size()) 
-            throw new IOException ("context index " + _contextIndex + " is out 
of bound " 
-                                            + _contextList.size());
-        return _contextList.get(_contextIndex);
-    }
-
-    
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
deleted file mode 100644
index 87df0ea..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-
-public class KafkaETLRequest {
-    public static long DEFAULT_OFFSET = -1;
-    public static String DELIM = "\t";
-    
-    String _topic;
-    URI _uri;
-    int _partition;
-    long _offset = DEFAULT_OFFSET;
-    String _clientId = "KafkaHadoopETL";
-    
-    public KafkaETLRequest() {
-        
-    }
-    
-    public KafkaETLRequest(String input) throws IOException {
-        //System.out.println("Init request from " + input);
-        String[] pieces = input.trim().split(DELIM);
-        if (pieces.length != 4)
-            throw new IOException( input + 
-                                            " : input must be in the form 
'url" + DELIM +
-                                            "topic" + DELIM +"partition" + 
DELIM +"offset'");
-
-        try {
-            _uri = new URI (pieces[0]); 
-        }catch (java.net.URISyntaxException e) {
-            throw new IOException (e);
-        }
-        _topic = pieces[1];
-        _partition = Integer.valueOf(pieces[2]);
-        _offset = Long.valueOf(pieces[3]);
-    }
-    
-    public KafkaETLRequest(String node, String topic, String partition, String 
offset, 
-                                    Map<String, String> nodes) throws 
IOException {
-
-        Integer nodeId = Integer.parseInt(node);
-        String uri = nodes.get(nodeId.toString());
-        if (uri == null) throw new IOException ("Cannot form node for id " + 
nodeId);
-        
-        try {
-            _uri = new URI (uri); 
-        }catch (java.net.URISyntaxException e) {
-            throw new IOException (e);
-        }
-        _topic = topic;
-        _partition = Integer.valueOf(partition);
-        _offset = Long.valueOf(offset);
-    }
-    
-    public KafkaETLRequest(String topic, String uri, int partition) throws 
URISyntaxException {
-        _topic = topic;
-        _uri = new URI(uri);
-        _partition = partition;
-    }
-    
-    public void setDefaultOffset() {
-        _offset = DEFAULT_OFFSET;
-    }
-    
-    public void setOffset(long offset) {
-        _offset = offset;
-    }
-    
-    public String getTopic() { return _topic; }
-    public URI getURI () { return _uri; }
-    public int getPartition() { return _partition; }
-    public long getOffset() { return _offset; }
-    public String clientId() { return _clientId; }
-
-    public boolean isValidOffset() {
-        return _offset >= 0;
-    }
-    
-    @Override
-    public boolean equals(Object o) {
-        if (! (o instanceof KafkaETLRequest))
-            return false;
-        
-        KafkaETLRequest r = (KafkaETLRequest) o;
-        return this._topic.equals(r._topic) ||
-                    this._uri.equals(r._uri) ||
-                    this._partition == r._partition;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString(0).hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return toString(_offset);
-    }
-    
-
-    public String toString (long offset) {
-    
-        return 
-        _uri + DELIM +
-        _topic + DELIM +
-        _partition + DELIM +
-       offset;
-    }
-    
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
deleted file mode 100644
index 02d79a1..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl;
-
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.BytesWritable;
-
-public class KafkaETLUtils {
-
-       public static PathFilter PATH_FILTER = new PathFilter() {
-               @Override
-               public boolean accept(Path path) {
-                       return !path.getName().startsWith("_")
-                                       && !path.getName().startsWith(".");
-               }
-       };
-
-       
-       public static Path getLastPath(Path path, FileSystem fs) throws 
IOException {
-
-               FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
-
-               if (statuses.length == 0) {
-                       return path;
-               } else {
-                       Arrays.sort(statuses);
-                       return statuses[statuses.length - 1].getPath();
-               }
-       }
-
-       public static String getFileName(Path path) throws IOException {
-               String fullname = path.toUri().toString();
-               String[] parts = fullname.split(Path.SEPARATOR);
-               if (parts.length < 1)
-                       throw new IOException("Invalid path " + fullname);
-               return parts[parts.length - 1];
-       }
-
-       public static List<String> readText(FileSystem fs, String inputFile)
-                       throws IOException, FileNotFoundException {
-               Path path = new Path(inputFile);
-               return readText(fs, path);
-       }
-
-       public static List<String> readText(FileSystem fs, Path path)
-                       throws IOException, FileNotFoundException {
-               if (!fs.exists(path)) {
-                       throw new FileNotFoundException("File " + path + " 
doesn't exist!");
-               }
-               BufferedReader in = new BufferedReader(new InputStreamReader(
-                               fs.open(path)));
-               List<String> buf = new ArrayList<String>();
-               String line = null;
-
-               while ((line = in.readLine()) != null) {
-                       if (line.trim().length() > 0)
-                               buf.add(new String(line.trim()));
-               }
-               in.close();
-               return buf;
-       }
-
-       public static void writeText(FileSystem fs, Path outPath, String 
content)
-                       throws IOException {
-               long timestamp = System.currentTimeMillis();
-               String localFile = "/tmp/KafkaETL_tmp_" + timestamp;
-               PrintWriter writer = new PrintWriter(new FileWriter(localFile));
-               writer.println(content);
-               writer.close();
-
-               Path src = new Path(localFile);
-               fs.moveFromLocalFile(src, outPath);
-       }
-
-       public static Props getPropsFromJob(Configuration conf) {
-               String propsString = conf.get("kafka.etl.props");
-               if (propsString == null)
-                       throw new UndefinedPropertyException(
-                                       "The required property kafka.etl.props 
was not found in the Configuration.");
-               try {
-                       ByteArrayInputStream input = new ByteArrayInputStream(
-                                       propsString.getBytes("UTF-8"));
-                       Properties properties = new Properties();
-                       properties.load(input);
-                       return new Props(properties);
-               } catch (IOException e) {
-                       throw new RuntimeException("This is not possible!", e);
-               }
-       }
-
-        public static void setPropsInJob(Configuration conf, Props props)
-         {
-           ByteArrayOutputStream output = new ByteArrayOutputStream();
-           try
-           {
-             props.store(output);
-             conf.set("kafka.etl.props", new String(output.toByteArray(), 
"UTF-8"));
-           }
-           catch (IOException e)
-           {
-             throw new RuntimeException("This is not possible!", e);
-           }
-         }
-        
-       public static Props readProps(String file) throws IOException {
-               Path path = new Path(file);
-               FileSystem fs = path.getFileSystem(new Configuration());
-               if (fs.exists(path)) {
-                       InputStream input = fs.open(path);
-                       try {
-                               // wrap it up in another layer so that the user 
can override
-                               // properties
-                               Props p = new Props(input);
-                               return new Props(p);
-                       } finally {
-                               input.close();
-                       }
-               } else {
-                       return new Props();
-               }
-       }
-
-       public static String findContainingJar(
-                       @SuppressWarnings("rawtypes") Class my_class, 
ClassLoader loader) {
-               String class_file = my_class.getName().replaceAll("\\.", "/")
-                               + ".class";
-               return findContainingJar(class_file, loader);
-       }
-
-       public static String findContainingJar(String fileName, ClassLoader 
loader) {
-               try {
-                       for (@SuppressWarnings("rawtypes")
-                       Enumeration itr = loader.getResources(fileName); itr
-                                       .hasMoreElements();) {
-                               URL url = (URL) itr.nextElement();
-                               // logger.info("findContainingJar finds url:" + 
url);
-                               if ("jar".equals(url.getProtocol())) {
-                                       String toReturn = url.getPath();
-                                       if (toReturn.startsWith("file:")) {
-                                               toReturn = 
toReturn.substring("file:".length());
-                                       }
-                                       toReturn = URLDecoder.decode(toReturn, 
"UTF-8");
-                                       return toReturn.replaceAll("!.*$", "");
-                               }
-                       }
-               } catch (IOException e) {
-                       throw new RuntimeException(e);
-               }
-               return null;
-       }
-
-    public static byte[] getBytes(BytesWritable val) {
-        
-        byte[] buffer = val.getBytes();
-        
-        /* FIXME: remove the following part once the below gira is fixed
-         * https://issues.apache.org/jira/browse/HADOOP-6298
-         */
-        long len = val.getLength();
-        byte [] bytes = buffer;
-        if (len < buffer.length) {
-            bytes = new byte[(int) len];
-            System.arraycopy(buffer, 0, bytes, 0, (int)len);
-        }
-        
-        return bytes;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
deleted file mode 100644
index 71eb80f..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl;
-
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import kafka.common.KafkaException;
-import org.apache.log4j.Logger;
-
-public class Props extends Properties {
-
-       private static final long serialVersionUID = 1L;
-       private static Logger logger = Logger.getLogger(Props.class);
-       
-       /**
-        * default constructor
-        */
-       public Props() {
-               super();
-       }
-
-       /**
-        * copy constructor 
-        * @param props
-        */
-       public Props(Props props) {
-               if (props != null) {
-                       this.put(props);
-               }
-       }
-       
-       /**
-        * construct props from a list of files
-        * @param files         paths of files
-        * @throws FileNotFoundException
-        * @throws IOException
-        */
-       public Props(String... files) throws FileNotFoundException, IOException 
{
-               this(Arrays.asList(files));
-       }
-
-       /**
-        * construct props from a list of files
-        * @param files         paths of files
-        * @throws FileNotFoundException
-        * @throws IOException
-        */
-       public Props(List<String> files) throws FileNotFoundException, 
IOException {
-
-               for (int i = 0; i < files.size(); i++) {
-                       InputStream input = new BufferedInputStream(new 
FileInputStream(
-                                       new 
File(files.get(i)).getAbsolutePath()));
-                       super.load(input);
-                       input.close();
-               }
-       }
-
-       /**
-        * construct props from a list of input streams
-        * @param inputStreams
-        * @throws IOException
-        */
-       public Props(InputStream... inputStreams) throws IOException {
-               for (InputStream stream : inputStreams)
-                       super.load(stream);
-       }
-
-       /**
-        * construct props from a list of maps
-        * @param props
-        */
-       public Props(Map<String, String>... props) {
-               for (int i = props.length - 1; i >= 0; i--)
-                       super.putAll(props[i]);
-       }
-
-       /**
-        * construct props from a list of Properties
-        * @param properties
-        */
-       public Props(Properties... properties) {
-               for (int i = properties.length - 1; i >= 0; i--){
-                       this.put(properties[i]);
-               }
-       }
-
-       /**
-        * build props from a list of strings and interpret them as
-        * key, value, key, value,....
-        * 
-        * @param args
-        * @return props
-        */
-       @SuppressWarnings("unchecked")
-       public static Props of(String... args) {
-               if (args.length % 2 != 0)
-                       throw new KafkaException(
-                                       "Must have an equal number of keys and 
values.");
-               Map<String, String> vals = new HashMap<String, 
String>(args.length / 2);
-               for (int i = 0; i < args.length; i += 2)
-                       vals.put(args[i], args[i + 1]);
-               return new Props(vals);
-       }
-
-       /**
-        * Put the given Properties into the Props. 
-        * 
-        * @param properties
-        *            The properties to put
-        * 
-        */
-       public void put(Properties properties) {
-               for (String propName : properties.stringPropertyNames()) {
-                       super.put(propName, properties.getProperty(propName));
-               }
-       }
-
-       /**
-        * get property of "key" and split the value by " ," 
-        * @param key           
-        * @return list of values
-        */
-       public List<String> getStringList(String key) {
-               return getStringList(key, "\\s*,\\s*");
-       }
-
-       /**
-        * get property of "key" and split the value by "sep"
-        * @param key
-        * @param sep
-        * @return string list of values
-        */
-       public List<String> getStringList(String key, String sep) {
-               String val =  super.getProperty(key);
-               if (val == null || val.trim().length() == 0)
-                       return Collections.emptyList();
-
-               if (containsKey(key))
-                       return Arrays.asList(val.split(sep));
-               else
-                       throw new UndefinedPropertyException("Missing required 
property '"
-                                       + key + "'");
-       }
-
-       /**
-        * get string list with default value. default delimiter is ","
-        * @param key
-        * @param defaultValue
-        * @return string list of values
-        */
-       public List<String> getStringList(String key, List<String> 
defaultValue) {
-               if (containsKey(key))
-                       return getStringList(key);
-               else
-                       return defaultValue;
-       }
-
-       /**
-        * get string list with default value
-        * @param key
-        * @param defaultValue
-        * @return string list of values
-        */
-       public List<String> getStringList(String key, List<String> defaultValue,
-                       String sep) {
-               if (containsKey(key))
-                       return getStringList(key, sep);
-               else
-                       return defaultValue;
-       }
-
-       @SuppressWarnings("unchecked")
-       protected <T> T getValue(String key, T defaultValue) 
-       throws Exception {
-               
-               if (containsKey(key)) {
-                       Object value = super.get(key);
-                       if (value.getClass().isInstance(defaultValue)) {
-                               return (T)value;
-                       } else if (value instanceof String) {
-                               // call constructor(String) to initialize it
-                               @SuppressWarnings("rawtypes")
-                               Constructor ct = 
defaultValue.getClass().getConstructor(String.class);
-                               String v = ((String)value).trim();
-                               Object ret = ct.newInstance(v);
-                               return (T) ret;
-                       }
-                       else throw new UndefinedPropertyException ("Property " 
+ key + 
-                                       ": cannot convert value of " + 
value.getClass().getName() + 
-                                       " to " + 
defaultValue.getClass().getName());
-               }
-               else {
-                       return defaultValue;
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       protected <T> T getValue(String key, Class<T> mclass) 
-       throws Exception {
-               
-               if (containsKey(key)) {
-                       Object value = super.get(key);
-                       if (value.getClass().equals(mclass)) {
-                               return (T)value;
-                       } else if (value instanceof String) {
-                               // call constructor(String) to initialize it
-                               @SuppressWarnings("rawtypes")
-                               Constructor ct = 
mclass.getConstructor(String.class);
-                               String v = ((String)value).trim();
-                               Object ret = ct.newInstance(v);
-                               return (T) ret;
-                       }
-                       else throw new UndefinedPropertyException ("Property " 
+ key + 
-                                       ": cannot convert value of " + 
value.getClass().getName() + 
-                                       " to " + mclass.getClass().getName());
-               }
-               else {
-                       throw new UndefinedPropertyException ("Missing required 
property '"
-                                       + key + "'");
-               }
-       }
-
-       /**
-        * get boolean value with default value
-        * @param key
-        * @param defaultValue
-        * @return boolean value
-        * @throws Exception    if value is not of type boolean or string
-        */
-       public Boolean getBoolean(String key, Boolean defaultValue) 
-       throws Exception {
-               return getValue (key, defaultValue);
-       }
-
-       /**
-        * get boolean value
-        * @param key
-        * @return boolean value
-        * @throws Exception    if value is not of type boolean or string or 
-        *                                                                      
        if value doesn't exist
-        */
-       public Boolean getBoolean(String key) throws Exception {
-               return getValue (key, Boolean.class);
-       }
-
-       /**
-        * get long value with default value
-        * @param name
-        * @param defaultValue
-        * @return long value
-        * @throws Exception    if value is not of type long or string
-        */
-       public Long getLong(String name, Long defaultValue) 
-       throws Exception {
-               return getValue(name, defaultValue);
-       }
-
-       /**
-        * get long value
-        * @param name
-        * @return long value
-        * @throws Exception    if value is not of type long or string or 
-        *                                                                      
        if value doesn't exist
-        */
-       public Long getLong(String name) throws Exception  {
-               return getValue (name, Long.class);
-       }
-
-       /**
-        * get integer value with default value
-        * @param name
-        * @param defaultValue
-        * @return integer value
-        * @throws Exception    if value is not of type integer or string
-        */
-       public Integer getInt(String name, Integer defaultValue) 
-       throws Exception  {
-               return getValue(name, defaultValue);
-       }
-
-       /**
-        * get integer value
-        * @param name
-        * @return integer value
-        * @throws Exception    if value is not of type integer or string or 
-        *                                                                      
        if value doesn't exist
-        */
-       public Integer getInt(String name) throws Exception {
-               return getValue (name, Integer.class);
-       }
-
-       /**
-        * get double value with default value
-        * @param name
-        * @param defaultValue
-        * @return double value
-        * @throws Exception    if value is not of type double or string
-        */
-       public Double getDouble(String name, double defaultValue) 
-       throws Exception {
-               return getValue(name, defaultValue);
-       }
-
-       /**
-        * get double value
-        * @param name
-        * @return double value
-        * @throws Exception    if value is not of type double or string or 
-        *                                                                      
        if value doesn't exist
-        */
-       public double getDouble(String name) throws Exception {
-               return getValue(name, Double.class);
-       }
-
-       /**
-        * get URI value with default value
-        * @param name
-        * @param defaultValue
-        * @return URI value
-        * @throws Exception    if value is not of type URI or string 
-        */
-       public URI getUri(String name, URI defaultValue) throws Exception {
-               return getValue(name, defaultValue);
-       }
-
-       /**
-        * get URI value
-        * @param name
-        * @param defaultValue
-        * @return URI value
-        * @throws Exception    if value is not of type URI or string 
-        */
-       public URI getUri(String name, String defaultValue) 
-       throws Exception {
-               URI defaultV = new URI(defaultValue);
-               return getValue(name, defaultV);
-       }
-
-       /**
-        * get URI value
-        * @param name
-        * @return URI value
-        * @throws Exception    if value is not of type URI or string or 
-        *                                                                      
        if value doesn't exist
-        */
-       public URI getUri(String name) throws Exception {
-               return getValue(name, URI.class);
-       }
-
-       /**
-        * compare two props 
-        * @param p
-        * @return true or false
-        */
-       public boolean equalsProps(Props p) {
-               if (p == null) {
-                       return false;
-               }
-
-               final Set<String> myKeySet = getKeySet();
-               for (String s : myKeySet) {
-                       if (!get(s).equals(p.get(s))) {
-                               return false;
-                       }
-               }
-
-               return myKeySet.size() == p.getKeySet().size();
-       }
-
-
-       /**
-        * Get a map of all properties by string prefix
-        * 
-        * @param prefix
-        *            The string prefix
-        */
-       public Map<String, String> getMapByPrefix(String prefix) {
-               Map<String, String> values = new HashMap<String, String>();
-
-               for (String key : super.stringPropertyNames()) {
-                       if (key.startsWith(prefix)) {
-                               values.put(key.substring(prefix.length()), 
super.getProperty(key));
-                       }
-               }
-               return values;
-       }
-
-    /**
-     * Store all properties
-     * 
-     * @param out The stream to write to
-     * @throws IOException If there is an error writing
-     */
-    public void store(OutputStream out) throws IOException {
-           super.store(out, null);
-    }
-    
-    /**
-     * get all property names
-     * @return set of property names
-     */
-       public Set<String> getKeySet() {
-               return super.stringPropertyNames();
-       }
-
-       /**
-        * log properties
-        * @param comment
-        */
-       public void logProperties(String comment) {
-               logger.info(comment);
-
-               for (String key : getKeySet()) {
-                       logger.info("  key=" + key + " value=" + get(key));
-               }
-       }
-
-       /**
-        * clone a Props
-        * @param p
-        * @return props
-        */
-       public static Props clone(Props p) {
-               return new Props(p);
-       }
-
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
deleted file mode 100644
index 9278122..0000000
--- 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl;
-
-public class UndefinedPropertyException extends RuntimeException {
-
-       private static final long serialVersionUID = 1;
-
-       public UndefinedPropertyException(String message) {
-               super(message);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
deleted file mode 100644
index d27a511..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl.impl;
-
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import kafka.etl.KafkaETLKey;
-import kafka.etl.KafkaETLRequest;
-import kafka.etl.Props;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.ProducerConfig;
-import kafka.producer.KeyedMessage;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-
-import static org.apache.kafka.common.utils.Utils.formatAddress;
-
-/**
- * Use this class to produce test events to Kafka server. Each event contains a
- * random timestamp in text format.
- */
-@SuppressWarnings("deprecation")
-public class DataGenerator {
-
-       protected final static Random RANDOM = new Random(
-                       System.currentTimeMillis());
-
-       protected Props _props;
-       protected Producer _producer = null;
-       protected URI _uri = null;
-       protected String _topic;
-       protected int _count;
-       protected String _offsetsDir;
-       protected final int TCP_BUFFER_SIZE = 300 * 1000;
-       protected final int CONNECT_TIMEOUT = 20000; // ms
-       protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE; // ms
-
-       public DataGenerator(String id, Props props) throws Exception {
-               _props = props;
-               _topic = props.getProperty("kafka.etl.topic");
-               System.out.println("topics=" + _topic);
-               _count = props.getInt("event.count");
-
-               _offsetsDir = _props.getProperty("input");
-               
-               // initialize kafka producer to generate count events
-               String serverUri = _props.getProperty("kafka.server.uri");
-               _uri = new URI (serverUri);
-               
-               System.out.println("server uri:" + _uri.toString());
-        Properties producerProps = new Properties();
-        producerProps.put("metadata.broker.list", 
formatAddress(_uri.getHost(), _uri.getPort()));
-        producerProps.put("send.buffer.bytes", 
String.valueOf(TCP_BUFFER_SIZE));
-        producerProps.put("connect.timeout.ms", 
String.valueOf(CONNECT_TIMEOUT));
-        producerProps.put("reconnect.interval", 
String.valueOf(RECONNECT_INTERVAL));
-        
-               _producer = new Producer(new ProducerConfig(producerProps));
-                       
-       }
-
-       public void run() throws Exception {
-
-               List<KeyedMessage> list = new ArrayList<KeyedMessage>();
-               for (int i = 0; i < _count; i++) {
-                       Long timestamp = RANDOM.nextLong();
-                       if (timestamp < 0) timestamp = -timestamp;
-                       byte[] bytes = timestamp.toString().getBytes("UTF8");
-            list.add(new KeyedMessage<Integer, byte[]>(_topic, null, bytes));
-               }
-               // send events
-               System.out.println(" send " + list.size() + " " + _topic + " 
count events to " + _uri);
-               _producer.send(list);
-
-               // close the producer
-               _producer.close();
-               
-               // generate offset files
-               generateOffsets();
-       }
-
-    protected void generateOffsets() throws Exception {
-        JobConf conf = new JobConf();
-        conf.set("hadoop.job.ugi", _props.getProperty("hadoop.job.ugi"));
-        conf.setCompressMapOutput(false);
-        Path outPath = new Path(_offsetsDir + Path.SEPARATOR + "1.dat");
-        FileSystem fs = outPath.getFileSystem(conf);
-        if (fs.exists(outPath)) fs.delete(outPath);
-        
-        KafkaETLRequest request =
-            new KafkaETLRequest(_topic, "tcp://" + 
formatAddress(_uri.getHost(), _uri.getPort()), 0);
-
-        System.out.println("Dump " + request.toString() + " to " + 
outPath.toUri().toString());
-        byte[] bytes = request.toString().getBytes("UTF-8");
-        KafkaETLKey dummyKey = new KafkaETLKey();
-        SequenceFile.setCompressionType(conf, 
SequenceFile.CompressionType.NONE);
-        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, 
outPath, 
-                                        KafkaETLKey.class, 
BytesWritable.class);
-        writer.append(dummyKey, new BytesWritable(bytes));
-        writer.close();
-    }
-    
-       public static void main(String[] args) throws Exception {
-
-               if (args.length < 1)
-                       throw new Exception("Usage: - config_file");
-
-               Props props = new Props(args[0]);
-               DataGenerator job = new DataGenerator("DataGenerator", props);
-               job.run();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java
deleted file mode 100644
index d269704..0000000
--- 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl.impl;
-
-import kafka.etl.KafkaETLInputFormat;
-import kafka.etl.KafkaETLJob;
-import kafka.etl.Props;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-/**
- * This is a simple Kafka ETL job which pull text events generated by
- * DataGenerator and store them in hdfs
- */
-@SuppressWarnings("deprecation")
-public class SimpleKafkaETLJob {
-
-    protected String _name;
-    protected Props _props;
-    protected String _input;
-    protected String _output;
-    protected String _topic;
-    
-       public SimpleKafkaETLJob(String name, Props props) throws Exception {
-               _name = name;
-               _props = props;
-               
-               _input = _props.getProperty("input");
-               _output = _props.getProperty("output");
-               
-               _topic = props.getProperty("kafka.etl.topic");
-       }
-
-
-       protected JobConf createJobConf() throws Exception {
-               JobConf jobConf = KafkaETLJob.createJobConf("SimpleKafakETL", 
_topic, _props, getClass());
-               
-               jobConf.setMapperClass(SimpleKafkaETLMapper.class);
-               KafkaETLInputFormat.setInputPaths(jobConf, new Path(_input));
-               
-               jobConf.setOutputKeyClass(LongWritable.class);
-               jobConf.setOutputValueClass(Text.class);
-               jobConf.setOutputFormat(TextOutputFormat.class);
-               TextOutputFormat.setCompressOutput(jobConf, false);
-               Path output = new Path(_output);
-               FileSystem fs = output.getFileSystem(jobConf);
-               if (fs.exists(output)) fs.delete(output);
-               TextOutputFormat.setOutputPath(jobConf, output);
-               
-               jobConf.setNumReduceTasks(0);
-               return jobConf;
-       }
-       
-    public void execute () throws Exception {
-        JobConf conf = createJobConf();
-        RunningJob runningJob = new JobClient(conf).submitJob(conf);
-        String id = runningJob.getJobID();
-        System.out.println("Hadoop job id=" + id);
-        runningJob.waitForCompletion();
-        
-        if (!runningJob.isSuccessful()) 
-            throw new Exception("Hadoop ETL job failed! Please check status on 
http://";
-                                         + conf.get("mapred.job.tracker") + 
"/jobdetails.jsp?jobid=" + id);
-    }
-
-       /**
-        * for testing only
-        * 
-        * @param args
-        * @throws Exception
-        */
-       public static void main(String[] args) throws Exception {
-
-               if (args.length < 1)
-                       throw new Exception("Usage: - config_file");
-
-               Props props = new Props(args[0]);
-               SimpleKafkaETLJob job = new 
SimpleKafkaETLJob("SimpleKafkaETLJob",
-                               props);
-               job.execute();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
 
b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
deleted file mode 100644
index 0fea5db..0000000
--- 
a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl.impl;
-
-import kafka.etl.KafkaETLKey;
-import kafka.etl.KafkaETLUtils;
-import kafka.message.Message;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * Simple implementation of KafkaETLMapper. It assumes that 
- * input data are text timestamp (long).
- */
-@SuppressWarnings("deprecation")
-public class SimpleKafkaETLMapper implements
-Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> {
-
-    protected long _count = 0;
-    
-       protected Text getData(Message message) throws IOException {
-               ByteBuffer buf = message.payload();
-               if(buf == null)
-                 return new Text();
-               
-               byte[] array = new byte[buf.limit()];
-               buf.get(array);
-               
-               Text text = new Text( new String(array, "UTF8"));
-               return text;
-       }
-
-
-    @Override
-    public void map(KafkaETLKey key, BytesWritable val,
-            OutputCollector<LongWritable, Text> collector,
-            Reporter reporter) throws IOException {
-        
-         
-        byte[] bytes = KafkaETLUtils.getBytes(val);
-        
-        //check the checksum of message
-        Message message = new Message(ByteBuffer.wrap(bytes));
-        long checksum = key.getChecksum();
-        if (checksum != message.checksum()) 
-            throw new IOException ("Invalid message checksum " 
-                                            + message.checksum() + ". Expected 
" + key + ".");
-        Text data = getData (message);
-        _count ++;
-           
-        collector.collect(new LongWritable (_count), data);
-
-    }
-
-
-    @Override
-    public void configure(JobConf arg0) {
-        // TODO Auto-generated method stub
-        
-    }
-
-
-    @Override
-    public void close() throws IOException {
-        // TODO Auto-generated method stub
-        
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/test/test.properties
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/test/test.properties 
b/contrib/hadoop-consumer/test/test.properties
deleted file mode 100644
index cdea8cc..0000000
--- a/contrib/hadoop-consumer/test/test.properties
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# name of test topic
-kafka.etl.topic=SimpleTestEvent
-
-# hdfs location of jars
-hdfs.default.classpath.dir=/tmp/kafka/lib
-
-# number of test events to be generated
-event.count=1000
-
-# hadoop id and group
-hadoop.job.ugi=kafka,hadoop
-
-# kafka server uri
-kafka.server.uri=tcp://localhost:9092
-
-# hdfs location of input directory 
-input=/tmp/kafka/data
-
-# hdfs location of output directory
-output=/tmp/kafka/output
-
-# limit the number of events to be fetched;
-# value -1 means no limitation
-kafka.request.limit=-1
-
-# kafka parameters
-client.buffer.size=1048576
-client.so.timeout=60000

Reply via email to