KAFKA-2783; Drop outdated hadoop contrib modules Author: Grant Henke <granthe...@gmail.com>
Reviewers: Gwen Shapira Closes #466 from granthenke/drop-contrib (cherry picked from commit 69af573b35f04657e31f60e636aba19ffa0b2c84) Signed-off-by: Gwen Shapira <csh...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e176fcc7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e176fcc7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e176fcc7 Branch: refs/heads/0.9.0 Commit: e176fcc7fb146bf9be2d8d8f2a4f5e02f4753731 Parents: f22ea29 Author: Grant Henke <granthe...@gmail.com> Authored: Mon Nov 9 11:02:46 2015 -0800 Committer: Gwen Shapira <csh...@gmail.com> Committed: Mon Nov 9 11:03:07 2015 -0800 ---------------------------------------------------------------------- README.md | 2 +- build.gradle | 51 +-- contrib/LICENSE | 1 - contrib/NOTICE | 1 - contrib/hadoop-consumer/README | 66 --- contrib/hadoop-consumer/copy-jars.sh | 69 --- contrib/hadoop-consumer/hadoop-setup.sh | 20 - contrib/hadoop-consumer/run-class.sh | 65 --- .../main/java/kafka/etl/KafkaETLContext.java | 270 ----------- .../java/kafka/etl/KafkaETLInputFormat.java | 78 ---- .../src/main/java/kafka/etl/KafkaETLJob.java | 172 ------- .../src/main/java/kafka/etl/KafkaETLKey.java | 104 ----- .../java/kafka/etl/KafkaETLRecordReader.java | 180 -------- .../main/java/kafka/etl/KafkaETLRequest.java | 129 ------ .../src/main/java/kafka/etl/KafkaETLUtils.java | 205 --------- .../src/main/java/kafka/etl/Props.java | 458 ------------------- .../kafka/etl/UndefinedPropertyException.java | 28 -- .../main/java/kafka/etl/impl/DataGenerator.java | 134 ------ .../java/kafka/etl/impl/SimpleKafkaETLJob.java | 104 ----- .../kafka/etl/impl/SimpleKafkaETLMapper.java | 91 ---- contrib/hadoop-consumer/test/test.properties | 42 -- contrib/hadoop-producer/README.md | 94 ---- .../kafka/bridge/examples/TextPublisher.java | 66 --- .../kafka/bridge/hadoop/KafkaOutputFormat.java | 144 ------ .../kafka/bridge/hadoop/KafkaRecordWriter.java | 88 ---- .../java/kafka/bridge/pig/AvroKafkaStorage.java | 115 ----- docs/api.html | 20 +- docs/documentation.html | 2 +- docs/implementation.html | 88 ++-- settings.gradle | 4 +- 30 files changed, 54 insertions(+), 2837 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index dc15923..6baa17e 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ The release file can be found inside ./core/build/distributions/. ./gradlew -PscalaVersion=2.11.7 releaseTarGz ### Running a task for a specific project ### -This is for 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples' and 'clients' +This is for 'core', 'examples' and 'clients' ./gradlew core:jar ./gradlew core:test http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 4ea0ee3..f9fd42a 100644 --- a/build.gradle +++ b/build.gradle @@ -229,7 +229,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) { } def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file'] -def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + connectPkgs +def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {} tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { } @@ -376,55 +376,6 @@ project(':core') { } } -project(':contrib:hadoop-consumer') { - archivesBaseName = "kafka-hadoop-consumer" - - dependencies { - compile project(':core') - compile "org.apache.avro:avro:1.4.0" - compile "org.apache.pig:pig:0.8.0" - compile "commons-logging:commons-logging:1.0.4" - compile "org.codehaus.jackson:jackson-core-asl:1.5.5" - compile "org.codehaus.jackson:jackson-mapper-asl:1.5.5" - compile "org.apache.hadoop:hadoop-core:0.20.2" - } - - configurations { - // manually excludes some unnecessary dependencies - compile.exclude module: 'javax' - compile.exclude module: 'jms' - compile.exclude module: 'jmxri' - compile.exclude module: 'jmxtools' - compile.exclude module: 'mail' - compile.exclude module: 'netty' - } -} - -project(':contrib:hadoop-producer') { - archivesBaseName = "kafka-hadoop-producer" - - dependencies { - compile project(':core') - compile("org.apache.avro:avro:1.4.0") { force = true } - compile "org.apache.pig:pig:0.8.0" - compile "commons-logging:commons-logging:1.0.4" - compile "org.codehaus.jackson:jackson-core-asl:1.5.5" - compile "org.codehaus.jackson:jackson-mapper-asl:1.5.5" - compile "org.apache.hadoop:hadoop-core:0.20.2" - compile "org.apache.pig:piggybank:0.12.0" - } - - configurations { - // manually excludes some unnecessary dependencies - compile.exclude module: 'javax' - compile.exclude module: 'jms' - compile.exclude module: 'jmxri' - compile.exclude module: 'jmxtools' - compile.exclude module: 'mail' - compile.exclude module: 'netty' - } -} - project(':examples') { archivesBaseName = "kafka-examples" http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/LICENSE ---------------------------------------------------------------------- diff --git a/contrib/LICENSE b/contrib/LICENSE deleted file mode 120000 index ea5b606..0000000 --- a/contrib/LICENSE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/NOTICE ---------------------------------------------------------------------- diff --git a/contrib/NOTICE b/contrib/NOTICE deleted file mode 120000 index 7e1b82f..0000000 --- a/contrib/NOTICE +++ /dev/null @@ -1 +0,0 @@ -../NOTICE \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/README ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/README b/contrib/hadoop-consumer/README deleted file mode 100644 index 54a2666..0000000 --- a/contrib/hadoop-consumer/README +++ /dev/null @@ -1,66 +0,0 @@ -This is a Hadoop job that pulls data from kafka server into HDFS. - -It requires the following inputs from a configuration file -(test/test.properties is an example) - -kafka.etl.topic : the topic to be fetched; - -input : input directory containing topic offsets and - it can be generated by DataGenerator; - the number of files in this directory determines the - number of mappers in the hadoop job; - -output : output directory containing kafka data and updated - topic offsets; - -kafka.request.limit : it is used to limit the number events fetched. - -KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat. -It fetches kafka data from the server. It starts from provided offsets -(specified by "input") and stops when it reaches the largest available offsets -or the specified limit (specified by "kafka.request.limit"). - -KafkaETLJob contains some helper functions to initialize job configuration. - -SimpleKafkaETLJob sets up job properties and files Hadoop job. - -SimpleKafkaETLMapper dumps kafka data into hdfs. - -HOW TO RUN: -In order to run this, make sure the HADOOP_HOME environment variable points to -your hadoop installation directory. - -1. Compile using "sbt" to create a package for hadoop consumer code. -./sbt package - -2. Run the hadoop-setup.sh script that enables write permission on the - required HDFS directory - -3. Produce test events in server and generate offset files - 1) Start kafka server [ Follow the quick start - - http://sna-projects.com/kafka/quickstart.php ] - - 2) Update test/test.properties to change the following parameters: - kafka.etl.topic : topic name - event.count : number of events to be generated - kafka.server.uri : kafka server uri; - input : hdfs directory of offset files - - 3) Produce test events to Kafka server and generate offset files - ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties - -4. Fetch generated topic into HDFS: - 1) Update test/test.properties to change the following parameters: - hadoop.job.ugi : id and group - input : input location - output : output location - kafka.request.limit: limit the number of events to be fetched; - -1 means no limitation. - hdfs.default.classpath.dir : hdfs location of jars - - 2) copy jars into hdfs - ./copy-jars.sh ${hdfs.default.classpath.dir} - - 2) Fetch data - ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties - http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/copy-jars.sh ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/copy-jars.sh b/contrib/hadoop-consumer/copy-jars.sh deleted file mode 100755 index e5de1dd..0000000 --- a/contrib/hadoop-consumer/copy-jars.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -if [ $# -lt 1 ]; -then - echo "USAGE: $0 dir" - exit 1 -fi - -base_dir=$(dirname $0)/../.. - -hadoop=${HADOOP_HOME}/bin/hadoop - -echo "$hadoop fs -rmr $1" -$hadoop fs -rmr $1 - -echo "$hadoop fs -mkdir $1" -$hadoop fs -mkdir $1 - -# include kafka jars -for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar; -do - echo "$hadoop fs -put $file $1/" - $hadoop fs -put $file $1/ -done - -# include kafka jars -echo "$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar; $1/" -$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar $1/ - -# include core lib jars -for file in $base_dir/core/lib/*.jar; -do - echo "$hadoop fs -put $file $1/" - $hadoop fs -put $file $1/ -done - -for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar; -do - echo "$hadoop fs -put $file $1/" - $hadoop fs -put $file $1/ -done - -# include scala library jar -echo "$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar; $1/" -$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar $1/ - -local_dir=$(dirname $0) - -# include hadoop-consumer jars -for file in $local_dir/lib/*.jar; -do - echo "$hadoop fs -put $file $1/" - $hadoop fs -put $file $1/ -done - http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/hadoop-setup.sh ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/hadoop-setup.sh b/contrib/hadoop-consumer/hadoop-setup.sh deleted file mode 100755 index c855e66..0000000 --- a/contrib/hadoop-consumer/hadoop-setup.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -hadoop=${HADOOP_HOME}/bin/hadoop - -$hadoop fs -chmod ugoa+w /tmp - http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/run-class.sh ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/run-class.sh b/contrib/hadoop-consumer/run-class.sh deleted file mode 100755 index bfb4744..0000000 --- a/contrib/hadoop-consumer/run-class.sh +++ /dev/null @@ -1,65 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -if [ $# -lt 1 ]; -then - echo "USAGE: $0 classname [opts]" - exit 1 -fi - -base_dir=$(dirname $0)/../.. - -# include kafka jars -for file in $base_dir/core/target/scala_2.8.0/kafka-*.jar -do - CLASSPATH=$CLASSPATH:$file -done - -for file in $base_dir/contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/*.jar; -do - CLASSPATH=$CLASSPATH:$file -done - -local_dir=$(dirname $0) - -# include hadoop-consumer jars -for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar; -do - CLASSPATH=$CLASSPATH:$file -done - -for file in $base_dir/contrib/hadoop-consumer/lib/*.jar; -do - CLASSPATH=$CLASSPATH:$file -done - -CLASSPATH=$CLASSPATH:$base_dir/project/boot/scala-2.8.0/lib/scala-library.jar - -echo $CLASSPATH - -CLASSPATH=dist:$CLASSPATH:${HADOOP_HOME}/conf - -#if [ -z "$KAFKA_OPTS" ]; then -# KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote" -#fi - -if [ -z "$JAVA_HOME" ]; then - JAVA="java" -else - JAVA="$JAVA_HOME/bin/java" -fi - -$JAVA $KAFKA_OPTS -cp $CLASSPATH $@ http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java deleted file mode 100644 index c9b9018..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.etl; - - -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.MultipleOutputs; - -@SuppressWarnings({ "deprecation"}) -public class KafkaETLContext { - - static protected int MAX_RETRY_TIME = 1; - final static String CLIENT_BUFFER_SIZE = "client.buffer.size"; - final static String CLIENT_TIMEOUT = "client.so.timeout"; - - final static int DEFAULT_BUFFER_SIZE = 1 * 1024 * 1024; - final static int DEFAULT_TIMEOUT = 60000; // one minute - - final static KafkaETLKey DUMMY_KEY = new KafkaETLKey(); - - protected int _index; /*index of context*/ - protected String _input = null; /*input string*/ - protected KafkaETLRequest _request = null; - protected SimpleConsumer _consumer = null; /*simple consumer*/ - - protected long[] _offsetRange = {0, 0}; /*offset range*/ - protected long _offset = Long.MAX_VALUE; /*current offset*/ - protected long _count; /*current count*/ - - protected FetchResponse _response = null; /*fetch response*/ - protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/ - protected Iterator<ByteBufferMessageSet> _respIterator = null; - protected int _retry = 0; - protected long _requestTime = 0; /*accumulative request time*/ - protected long _startTime = -1; - - protected int _bufferSize; - protected int _timeout; - protected Reporter _reporter; - - protected MultipleOutputs _mos; - protected OutputCollector<KafkaETLKey, BytesWritable> _offsetOut = null; - protected FetchRequestBuilder builder = new FetchRequestBuilder(); - - public long getTotalBytes() { - return (_offsetRange[1] > _offsetRange[0])? _offsetRange[1] - _offsetRange[0] : 0; - } - - public long getReadBytes() { - return _offset - _offsetRange[0]; - } - - public long getCount() { - return _count; - } - - /** - * construct using input string - */ - @SuppressWarnings("unchecked") - public KafkaETLContext(JobConf job, Props props, Reporter reporter, - MultipleOutputs mos, int index, String input) - throws Exception { - - _bufferSize = getClientBufferSize(props); - _timeout = getClientTimeout(props); - System.out.println("bufferSize=" +_bufferSize); - System.out.println("timeout=" + _timeout); - _reporter = reporter; - _mos = mos; - - // read topic and current offset from input - _index= index; - _input = input; - _request = new KafkaETLRequest(input.trim()); - - // read data from queue - URI uri = _request.getURI(); - _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize, "KafkaETLContext"); - - // get available offset range - _offsetRange = getOffsetRange(); - System.out.println("Connected to node " + uri - + " beginning reading at offset " + _offsetRange[0] - + " latest offset=" + _offsetRange[1]); - - _offset = _offsetRange[0]; - _count = 0; - _requestTime = 0; - _retry = 0; - - _startTime = System.currentTimeMillis(); - } - - public boolean hasMore () { - return _messageIt != null && _messageIt.hasNext() - || _response != null && _respIterator.hasNext() - || _offset < _offsetRange[1]; - } - - public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException { - if ( !hasMore() ) return false; - - boolean gotNext = get(key, value); - - if(_response != null) { - - while ( !gotNext && _respIterator.hasNext()) { - ByteBufferMessageSet msgSet = _respIterator.next(); - _messageIt = msgSet.iterator(); - gotNext = get(key, value); - } - } - return gotNext; - } - - public boolean fetchMore () throws IOException { - if (!hasMore()) return false; - - FetchRequest fetchRequest = builder - .clientId(_request.clientId()) - .addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize) - .build(); - - long tempTime = System.currentTimeMillis(); - _response = _consumer.fetch(fetchRequest); - if(_response != null) { - _respIterator = new ArrayList<ByteBufferMessageSet>(){{ - add(_response.messageSet(_request.getTopic(), _request.getPartition())); - }}.iterator(); - } - _requestTime += (System.currentTimeMillis() - tempTime); - - return true; - } - - @SuppressWarnings("unchecked") - public void output(String fileprefix) throws IOException { - String offsetString = _request.toString(_offset); - - if (_offsetOut == null) - _offsetOut = (OutputCollector<KafkaETLKey, BytesWritable>) - _mos.getCollector("offsets", fileprefix+_index, _reporter); - _offsetOut.collect(DUMMY_KEY, new BytesWritable(offsetString.getBytes("UTF-8"))); - - } - - public void close() throws IOException { - if (_consumer != null) _consumer.close(); - - String topic = _request.getTopic(); - long endTime = System.currentTimeMillis(); - _reporter.incrCounter(topic, "read-time(ms)", endTime - _startTime); - _reporter.incrCounter(topic, "request-time(ms)", _requestTime); - - long bytesRead = _offset - _offsetRange[0]; - double megaRead = bytesRead / (1024.0*1024.0); - _reporter.incrCounter(topic, "data-read(mb)", (long) megaRead); - _reporter.incrCounter(topic, "event-count", _count); - } - - protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException { - if (_messageIt != null && _messageIt.hasNext()) { - MessageAndOffset messageAndOffset = _messageIt.next(); - - ByteBuffer buf = messageAndOffset.message().buffer(); - int origSize = buf.remaining(); - byte[] bytes = new byte[origSize]; - buf.get(bytes, buf.position(), origSize); - value.set(bytes, 0, origSize); - - key.set(_index, _offset, messageAndOffset.message().checksum()); - - _offset = messageAndOffset.nextOffset(); //increase offset - _count ++; //increase count - - return true; - } - else return false; - } - - /** - * Get offset ranges - */ - protected long[] getOffsetRange() throws IOException { - - /* get smallest and largest offsets*/ - long[] range = new long[2]; - - TopicAndPartition topicAndPartition = new TopicAndPartition(_request.getTopic(), _request.getPartition()); - Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = - new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1)); - OffsetRequest request = new OffsetRequest( - requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId()); - long[] startOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition()); - if (startOffsets.length != 1) - throw new IOException("input:" + _input + " Expect one smallest offset but get " - + startOffsets.length); - range[0] = startOffsets[0]; - - requestInfo.clear(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1)); - request = new OffsetRequest( - requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId()); - long[] endOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition()); - if (endOffsets.length != 1) - throw new IOException("input:" + _input + " Expect one latest offset but get " - + endOffsets.length); - range[1] = endOffsets[0]; - - /*adjust range based on input offsets*/ - if ( _request.isValidOffset()) { - long startOffset = _request.getOffset(); - if (startOffset > range[0]) { - System.out.println("Update starting offset with " + startOffset); - range[0] = startOffset; - } - else { - System.out.println("WARNING: given starting offset " + startOffset - + " is smaller than the smallest one " + range[0] - + ". Will ignore it."); - } - } - System.out.println("Using offset range [" + range[0] + ", " + range[1] + "]"); - return range; - } - - public static int getClientBufferSize(Props props) throws Exception { - return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); - } - - public static int getClientTimeout(Props props) throws Exception { - return props.getInt(CLIENT_TIMEOUT, DEFAULT_TIMEOUT); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java deleted file mode 100644 index ddd6b72..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.etl; - - -import java.io.IOException; -import java.net.URI; -import java.util.Map; -import kafka.consumer.SimpleConsumer; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.lib.MultipleOutputs; - - -@SuppressWarnings("deprecation") -public class KafkaETLInputFormat -extends SequenceFileInputFormat<KafkaETLKey, BytesWritable> { - - protected Props _props; - protected int _bufferSize; - protected int _soTimeout; - - protected Map<Integer, URI> _nodes; - protected int _partition; - protected int _nodeId; - protected String _topic; - protected SimpleConsumer _consumer; - - protected MultipleOutputs _mos; - protected OutputCollector<BytesWritable, BytesWritable> _offsetOut = null; - - protected long[] _offsetRange; - protected long _startOffset; - protected long _offset; - protected boolean _toContinue = true; - protected int _retry; - protected long _timestamp; - protected long _count; - protected boolean _ignoreErrors = false; - - @Override - public RecordReader<KafkaETLKey, BytesWritable> getRecordReader(InputSplit split, - JobConf job, Reporter reporter) - throws IOException { - return new KafkaETLRecordReader(split, job, reporter); - } - - @Override - protected boolean isSplitable(FileSystem fs, Path file) { - return super.isSplitable(fs, file); - } - - @Override - public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException { - return super.getSplits(conf, numSplits); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java deleted file mode 100644 index 1a4bcba..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.etl; - - -import java.net.URI; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapred.lib.MultipleOutputs; - -@SuppressWarnings("deprecation") -public class KafkaETLJob { - - public static final String HADOOP_PREFIX = "hadoop-conf."; - /** - * Create a job configuration - */ - @SuppressWarnings("rawtypes") - public static JobConf createJobConf(String name, String topic, Props props, Class classobj) - throws Exception { - JobConf conf = getJobConf(name, props, classobj); - - conf.set("topic", topic); - - // input format - conf.setInputFormat(KafkaETLInputFormat.class); - - //turn off mapper speculative execution - conf.setMapSpeculativeExecution(false); - - // setup multiple outputs - MultipleOutputs.addMultiNamedOutput(conf, "offsets", SequenceFileOutputFormat.class, - KafkaETLKey.class, BytesWritable.class); - - - return conf; - } - - /** - * Helper function to initialize a job configuration - */ - public static JobConf getJobConf(String name, Props props, Class classobj) throws Exception { - JobConf conf = new JobConf(); - // set custom class loader with custom find resource strategy. - - conf.setJobName(name); - String hadoop_ugi = props.getProperty("hadoop.job.ugi", null); - if (hadoop_ugi != null) { - conf.set("hadoop.job.ugi", hadoop_ugi); - } - - if (props.getBoolean("is.local", false)) { - conf.set("mapred.job.tracker", "local"); - conf.set("fs.default.name", "file:///"); - conf.set("mapred.local.dir", "/tmp/map-red"); - - info("Running locally, no hadoop jar set."); - } else { - setClassLoaderAndJar(conf, classobj); - info("Setting hadoop jar file for class:" + classobj + " to " + conf.getJar()); - info("*************************************************************************"); - info(" Running on Real Hadoop Cluster(" + conf.get("mapred.job.tracker") + ") "); - info("*************************************************************************"); - } - - // set JVM options if present - if (props.containsKey("mapred.child.java.opts")) { - conf.set("mapred.child.java.opts", props.getProperty("mapred.child.java.opts")); - info("mapred.child.java.opts set to " + props.getProperty("mapred.child.java.opts")); - } - - // Adds External jars to hadoop classpath - String externalJarList = props.getProperty("hadoop.external.jarFiles", null); - if (externalJarList != null) { - String[] jarFiles = externalJarList.split(","); - for (String jarFile : jarFiles) { - info("Adding extenral jar File:" + jarFile); - DistributedCache.addFileToClassPath(new Path(jarFile), conf); - } - } - - // Adds distributed cache files - String cacheFileList = props.getProperty("hadoop.cache.files", null); - if (cacheFileList != null) { - String[] cacheFiles = cacheFileList.split(","); - for (String cacheFile : cacheFiles) { - info("Adding Distributed Cache File:" + cacheFile); - DistributedCache.addCacheFile(new URI(cacheFile), conf); - } - } - - // Adds distributed cache files - String archiveFileList = props.getProperty("hadoop.cache.archives", null); - if (archiveFileList != null) { - String[] archiveFiles = archiveFileList.split(","); - for (String archiveFile : archiveFiles) { - info("Adding Distributed Cache Archive File:" + archiveFile); - DistributedCache.addCacheArchive(new URI(archiveFile), conf); - } - } - - String hadoopCacheJarDir = props.getProperty("hdfs.default.classpath.dir", null); - if (hadoopCacheJarDir != null) { - FileSystem fs = FileSystem.get(conf); - if (fs != null) { - FileStatus[] status = fs.listStatus(new Path(hadoopCacheJarDir)); - - if (status != null) { - for (int i = 0; i < status.length; ++i) { - if (!status[i].isDir()) { - Path path = new Path(hadoopCacheJarDir, status[i].getPath().getName()); - info("Adding Jar to Distributed Cache Archive File:" + path); - - DistributedCache.addFileToClassPath(path, conf); - } - } - } else { - info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " is empty."); - } - } else { - info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " filesystem doesn't exist"); - } - } - - // May want to add this to HadoopUtils, but will await refactoring - for (String key : props.stringPropertyNames()) { - String lowerCase = key.toLowerCase(); - if (lowerCase.startsWith(HADOOP_PREFIX)) { - String newKey = key.substring(HADOOP_PREFIX.length()); - conf.set(newKey, props.getProperty(key)); - } - } - - KafkaETLUtils.setPropsInJob(conf, props); - - return conf; - } - - public static void info(String message) { - System.out.println(message); - } - - public static void setClassLoaderAndJar(JobConf conf, - @SuppressWarnings("rawtypes") Class jobClass) { - conf.setClassLoader(Thread.currentThread().getContextClassLoader()); - String jar = KafkaETLUtils.findContainingJar(jobClass, Thread - .currentThread().getContextClassLoader()); - if (jar != null) { - conf.setJar(jar); - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java deleted file mode 100644 index aafecea..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.etl; - - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import org.apache.hadoop.io.WritableComparable; - -public class KafkaETLKey implements WritableComparable<KafkaETLKey>{ - - protected int _inputIndex; - protected long _offset; - protected long _checksum; - - /** - * dummy empty constructor - */ - public KafkaETLKey() { - _inputIndex = 0; - _offset = 0; - _checksum = 0; - } - - public KafkaETLKey (int index, long offset) { - _inputIndex = index; - _offset = offset; - _checksum = 0; - } - - public KafkaETLKey (int index, long offset, long checksum) { - _inputIndex = index; - _offset = offset; - _checksum = checksum; - } - - public void set(int index, long offset, long checksum) { - _inputIndex = index; - _offset = offset; - _checksum = checksum; - } - - public int getIndex() { - return _inputIndex; - } - - public long getOffset() { - return _offset; - } - - public long getChecksum() { - return _checksum; - } - - @Override - public void readFields(DataInput in) throws IOException { - _inputIndex = in.readInt(); - _offset = in.readLong(); - _checksum = in.readLong(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(_inputIndex); - out.writeLong(_offset); - out.writeLong(_checksum); - } - - @Override - public int compareTo(KafkaETLKey o) { - if (_inputIndex != o._inputIndex) - return _inputIndex = o._inputIndex; - else { - if (_offset > o._offset) return 1; - else if (_offset < o._offset) return -1; - else { - if (_checksum > o._checksum) return 1; - else if (_checksum < o._checksum) return -1; - else return 0; - } - } - } - - @Override - public String toString() { - return "index=" + _inputIndex + " offset=" + _offset + " checksum=" + _checksum; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java deleted file mode 100644 index f040fbe..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.etl; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import kafka.common.KafkaException; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileRecordReader; -import org.apache.hadoop.mapred.lib.MultipleOutputs; - -@SuppressWarnings({ "deprecation" }) -public class KafkaETLRecordReader -extends SequenceFileRecordReader<KafkaETLKey, BytesWritable> { - - /* max number of retries */ - protected Props _props; /*properties*/ - protected JobConf _job; - protected Reporter _reporter ; - protected MultipleOutputs _mos; - protected List<KafkaETLContext> _contextList; - protected int _contextIndex ; - - protected long _totalBytes; - protected long _readBytes; - protected long _readCounts; - - protected String _attemptId = null; - - private static long _limit = 100; /*for testing only*/ - - public KafkaETLRecordReader(InputSplit split, JobConf job, Reporter reporter) - throws IOException { - super(job, (FileSplit) split); - - _props = KafkaETLUtils.getPropsFromJob(job); - _contextList = new ArrayList<KafkaETLContext>(); - _job = job; - _reporter = reporter; - _contextIndex = -1; - _mos = new MultipleOutputs(job); - try { - _limit = _props.getInt("kafka.request.limit", -1); - - /*get attemp id*/ - String taskId = _job.get("mapred.task.id"); - if (taskId == null) { - throw new KafkaException("Configuration does not contain the property mapred.task.id"); - } - String[] parts = taskId.split("_"); - if ( parts.length != 6 || !parts[0].equals("attempt") - || (!"m".equals(parts[3]) && !"r".equals(parts[3]))) { - throw new KafkaException("TaskAttemptId string : " + taskId + " is not properly formed"); - } - _attemptId = parts[4]+parts[3]; - }catch (Exception e) { - throw new IOException (e); - } - } - - @Override - public synchronized void close() throws IOException { - super.close(); - - /* now record some stats */ - for (KafkaETLContext context: _contextList) { - context.output(_attemptId); - context.close(); - } - - _mos.close(); - } - - @Override - public KafkaETLKey createKey() { - return super.createKey(); - } - - @Override - public BytesWritable createValue() { - return super.createValue(); - } - - @Override - public float getProgress() throws IOException { - if (_totalBytes == 0) return 0f; - - if (_contextIndex >= _contextList.size()) return 1f; - - if (_limit < 0) { - double p = ( _readBytes + getContext().getReadBytes() ) / ((double) _totalBytes); - return (float)p; - } - else { - double p = (_readCounts + getContext().getCount()) / ((double)_limit * _contextList.size()); - return (float)p; - } - } - - @Override - public synchronized boolean next(KafkaETLKey key, BytesWritable value) - throws IOException { - try{ - if (_contextIndex < 0) { /* first call, get all requests */ - System.out.println("RecordReader.next init()"); - _totalBytes = 0; - - while ( super.next(key, value)) { - String input = new String(value.getBytes(), "UTF-8"); - int index = _contextList.size(); - KafkaETLContext context = new KafkaETLContext( - _job, _props, _reporter, _mos, index, input); - _contextList.add(context); - _totalBytes += context.getTotalBytes(); - } - System.out.println("Number of requests=" + _contextList.size()); - - _readBytes = 0; - _readCounts = 0; - _contextIndex = 0; - } - - while (_contextIndex < _contextList.size()) { - - KafkaETLContext currContext = getContext(); - - while (currContext.hasMore() && - (_limit < 0 || currContext.getCount() < _limit)) { - - if (currContext.getNext(key, value)) { - //System.out.println("RecordReader.next get (key,value)"); - return true; - } - else { - //System.out.println("RecordReader.next fetch more"); - currContext.fetchMore(); - } - } - - _readBytes += currContext.getReadBytes(); - _readCounts += currContext.getCount(); - _contextIndex ++; - System.out.println("RecordReader.next will get from request " + _contextIndex); - } - }catch (Exception e) { - throw new IOException (e); - } - return false; - } - - protected KafkaETLContext getContext() throws IOException{ - if (_contextIndex >= _contextList.size()) - throw new IOException ("context index " + _contextIndex + " is out of bound " - + _contextList.size()); - return _contextList.get(_contextIndex); - } - - - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java deleted file mode 100644 index 87df0ea..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.etl; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Map; - -public class KafkaETLRequest { - public static long DEFAULT_OFFSET = -1; - public static String DELIM = "\t"; - - String _topic; - URI _uri; - int _partition; - long _offset = DEFAULT_OFFSET; - String _clientId = "KafkaHadoopETL"; - - public KafkaETLRequest() { - - } - - public KafkaETLRequest(String input) throws IOException { - //System.out.println("Init request from " + input); - String[] pieces = input.trim().split(DELIM); - if (pieces.length != 4) - throw new IOException( input + - " : input must be in the form 'url" + DELIM + - "topic" + DELIM +"partition" + DELIM +"offset'"); - - try { - _uri = new URI (pieces[0]); - }catch (java.net.URISyntaxException e) { - throw new IOException (e); - } - _topic = pieces[1]; - _partition = Integer.valueOf(pieces[2]); - _offset = Long.valueOf(pieces[3]); - } - - public KafkaETLRequest(String node, String topic, String partition, String offset, - Map<String, String> nodes) throws IOException { - - Integer nodeId = Integer.parseInt(node); - String uri = nodes.get(nodeId.toString()); - if (uri == null) throw new IOException ("Cannot form node for id " + nodeId); - - try { - _uri = new URI (uri); - }catch (java.net.URISyntaxException e) { - throw new IOException (e); - } - _topic = topic; - _partition = Integer.valueOf(partition); - _offset = Long.valueOf(offset); - } - - public KafkaETLRequest(String topic, String uri, int partition) throws URISyntaxException { - _topic = topic; - _uri = new URI(uri); - _partition = partition; - } - - public void setDefaultOffset() { - _offset = DEFAULT_OFFSET; - } - - public void setOffset(long offset) { - _offset = offset; - } - - public String getTopic() { return _topic; } - public URI getURI () { return _uri; } - public int getPartition() { return _partition; } - public long getOffset() { return _offset; } - public String clientId() { return _clientId; } - - public boolean isValidOffset() { - return _offset >= 0; - } - - @Override - public boolean equals(Object o) { - if (! (o instanceof KafkaETLRequest)) - return false; - - KafkaETLRequest r = (KafkaETLRequest) o; - return this._topic.equals(r._topic) || - this._uri.equals(r._uri) || - this._partition == r._partition; - } - - @Override - public int hashCode() { - return toString(0).hashCode(); - } - - @Override - public String toString() { - return toString(_offset); - } - - - public String toString (long offset) { - - return - _uri + DELIM + - _topic + DELIM + - _partition + DELIM + - offset; - } - - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java deleted file mode 100644 index 02d79a1..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.etl; - - -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.FileNotFoundException; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.net.URL; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Enumeration; -import java.util.List; -import java.util.Properties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.BytesWritable; - -public class KafkaETLUtils { - - public static PathFilter PATH_FILTER = new PathFilter() { - @Override - public boolean accept(Path path) { - return !path.getName().startsWith("_") - && !path.getName().startsWith("."); - } - }; - - - public static Path getLastPath(Path path, FileSystem fs) throws IOException { - - FileStatus[] statuses = fs.listStatus(path, PATH_FILTER); - - if (statuses.length == 0) { - return path; - } else { - Arrays.sort(statuses); - return statuses[statuses.length - 1].getPath(); - } - } - - public static String getFileName(Path path) throws IOException { - String fullname = path.toUri().toString(); - String[] parts = fullname.split(Path.SEPARATOR); - if (parts.length < 1) - throw new IOException("Invalid path " + fullname); - return parts[parts.length - 1]; - } - - public static List<String> readText(FileSystem fs, String inputFile) - throws IOException, FileNotFoundException { - Path path = new Path(inputFile); - return readText(fs, path); - } - - public static List<String> readText(FileSystem fs, Path path) - throws IOException, FileNotFoundException { - if (!fs.exists(path)) { - throw new FileNotFoundException("File " + path + " doesn't exist!"); - } - BufferedReader in = new BufferedReader(new InputStreamReader( - fs.open(path))); - List<String> buf = new ArrayList<String>(); - String line = null; - - while ((line = in.readLine()) != null) { - if (line.trim().length() > 0) - buf.add(new String(line.trim())); - } - in.close(); - return buf; - } - - public static void writeText(FileSystem fs, Path outPath, String content) - throws IOException { - long timestamp = System.currentTimeMillis(); - String localFile = "/tmp/KafkaETL_tmp_" + timestamp; - PrintWriter writer = new PrintWriter(new FileWriter(localFile)); - writer.println(content); - writer.close(); - - Path src = new Path(localFile); - fs.moveFromLocalFile(src, outPath); - } - - public static Props getPropsFromJob(Configuration conf) { - String propsString = conf.get("kafka.etl.props"); - if (propsString == null) - throw new UndefinedPropertyException( - "The required property kafka.etl.props was not found in the Configuration."); - try { - ByteArrayInputStream input = new ByteArrayInputStream( - propsString.getBytes("UTF-8")); - Properties properties = new Properties(); - properties.load(input); - return new Props(properties); - } catch (IOException e) { - throw new RuntimeException("This is not possible!", e); - } - } - - public static void setPropsInJob(Configuration conf, Props props) - { - ByteArrayOutputStream output = new ByteArrayOutputStream(); - try - { - props.store(output); - conf.set("kafka.etl.props", new String(output.toByteArray(), "UTF-8")); - } - catch (IOException e) - { - throw new RuntimeException("This is not possible!", e); - } - } - - public static Props readProps(String file) throws IOException { - Path path = new Path(file); - FileSystem fs = path.getFileSystem(new Configuration()); - if (fs.exists(path)) { - InputStream input = fs.open(path); - try { - // wrap it up in another layer so that the user can override - // properties - Props p = new Props(input); - return new Props(p); - } finally { - input.close(); - } - } else { - return new Props(); - } - } - - public static String findContainingJar( - @SuppressWarnings("rawtypes") Class my_class, ClassLoader loader) { - String class_file = my_class.getName().replaceAll("\\.", "/") - + ".class"; - return findContainingJar(class_file, loader); - } - - public static String findContainingJar(String fileName, ClassLoader loader) { - try { - for (@SuppressWarnings("rawtypes") - Enumeration itr = loader.getResources(fileName); itr - .hasMoreElements();) { - URL url = (URL) itr.nextElement(); - // logger.info("findContainingJar finds url:" + url); - if ("jar".equals(url.getProtocol())) { - String toReturn = url.getPath(); - if (toReturn.startsWith("file:")) { - toReturn = toReturn.substring("file:".length()); - } - toReturn = URLDecoder.decode(toReturn, "UTF-8"); - return toReturn.replaceAll("!.*$", ""); - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return null; - } - - public static byte[] getBytes(BytesWritable val) { - - byte[] buffer = val.getBytes(); - - /* FIXME: remove the following part once the below gira is fixed - * https://issues.apache.org/jira/browse/HADOOP-6298 - */ - long len = val.getLength(); - byte [] bytes = buffer; - if (len < buffer.length) { - bytes = new byte[(int) len]; - System.arraycopy(buffer, 0, bytes, 0, (int)len); - } - - return bytes; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java deleted file mode 100644 index 71eb80f..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java +++ /dev/null @@ -1,458 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.etl; - -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Constructor; -import java.net.URI; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import kafka.common.KafkaException; -import org.apache.log4j.Logger; - -public class Props extends Properties { - - private static final long serialVersionUID = 1L; - private static Logger logger = Logger.getLogger(Props.class); - - /** - * default constructor - */ - public Props() { - super(); - } - - /** - * copy constructor - * @param props - */ - public Props(Props props) { - if (props != null) { - this.put(props); - } - } - - /** - * construct props from a list of files - * @param files paths of files - * @throws FileNotFoundException - * @throws IOException - */ - public Props(String... files) throws FileNotFoundException, IOException { - this(Arrays.asList(files)); - } - - /** - * construct props from a list of files - * @param files paths of files - * @throws FileNotFoundException - * @throws IOException - */ - public Props(List<String> files) throws FileNotFoundException, IOException { - - for (int i = 0; i < files.size(); i++) { - InputStream input = new BufferedInputStream(new FileInputStream( - new File(files.get(i)).getAbsolutePath())); - super.load(input); - input.close(); - } - } - - /** - * construct props from a list of input streams - * @param inputStreams - * @throws IOException - */ - public Props(InputStream... inputStreams) throws IOException { - for (InputStream stream : inputStreams) - super.load(stream); - } - - /** - * construct props from a list of maps - * @param props - */ - public Props(Map<String, String>... props) { - for (int i = props.length - 1; i >= 0; i--) - super.putAll(props[i]); - } - - /** - * construct props from a list of Properties - * @param properties - */ - public Props(Properties... properties) { - for (int i = properties.length - 1; i >= 0; i--){ - this.put(properties[i]); - } - } - - /** - * build props from a list of strings and interpret them as - * key, value, key, value,.... - * - * @param args - * @return props - */ - @SuppressWarnings("unchecked") - public static Props of(String... args) { - if (args.length % 2 != 0) - throw new KafkaException( - "Must have an equal number of keys and values."); - Map<String, String> vals = new HashMap<String, String>(args.length / 2); - for (int i = 0; i < args.length; i += 2) - vals.put(args[i], args[i + 1]); - return new Props(vals); - } - - /** - * Put the given Properties into the Props. - * - * @param properties - * The properties to put - * - */ - public void put(Properties properties) { - for (String propName : properties.stringPropertyNames()) { - super.put(propName, properties.getProperty(propName)); - } - } - - /** - * get property of "key" and split the value by " ," - * @param key - * @return list of values - */ - public List<String> getStringList(String key) { - return getStringList(key, "\\s*,\\s*"); - } - - /** - * get property of "key" and split the value by "sep" - * @param key - * @param sep - * @return string list of values - */ - public List<String> getStringList(String key, String sep) { - String val = super.getProperty(key); - if (val == null || val.trim().length() == 0) - return Collections.emptyList(); - - if (containsKey(key)) - return Arrays.asList(val.split(sep)); - else - throw new UndefinedPropertyException("Missing required property '" - + key + "'"); - } - - /** - * get string list with default value. default delimiter is "," - * @param key - * @param defaultValue - * @return string list of values - */ - public List<String> getStringList(String key, List<String> defaultValue) { - if (containsKey(key)) - return getStringList(key); - else - return defaultValue; - } - - /** - * get string list with default value - * @param key - * @param defaultValue - * @return string list of values - */ - public List<String> getStringList(String key, List<String> defaultValue, - String sep) { - if (containsKey(key)) - return getStringList(key, sep); - else - return defaultValue; - } - - @SuppressWarnings("unchecked") - protected <T> T getValue(String key, T defaultValue) - throws Exception { - - if (containsKey(key)) { - Object value = super.get(key); - if (value.getClass().isInstance(defaultValue)) { - return (T)value; - } else if (value instanceof String) { - // call constructor(String) to initialize it - @SuppressWarnings("rawtypes") - Constructor ct = defaultValue.getClass().getConstructor(String.class); - String v = ((String)value).trim(); - Object ret = ct.newInstance(v); - return (T) ret; - } - else throw new UndefinedPropertyException ("Property " + key + - ": cannot convert value of " + value.getClass().getName() + - " to " + defaultValue.getClass().getName()); - } - else { - return defaultValue; - } - } - - @SuppressWarnings("unchecked") - protected <T> T getValue(String key, Class<T> mclass) - throws Exception { - - if (containsKey(key)) { - Object value = super.get(key); - if (value.getClass().equals(mclass)) { - return (T)value; - } else if (value instanceof String) { - // call constructor(String) to initialize it - @SuppressWarnings("rawtypes") - Constructor ct = mclass.getConstructor(String.class); - String v = ((String)value).trim(); - Object ret = ct.newInstance(v); - return (T) ret; - } - else throw new UndefinedPropertyException ("Property " + key + - ": cannot convert value of " + value.getClass().getName() + - " to " + mclass.getClass().getName()); - } - else { - throw new UndefinedPropertyException ("Missing required property '" - + key + "'"); - } - } - - /** - * get boolean value with default value - * @param key - * @param defaultValue - * @return boolean value - * @throws Exception if value is not of type boolean or string - */ - public Boolean getBoolean(String key, Boolean defaultValue) - throws Exception { - return getValue (key, defaultValue); - } - - /** - * get boolean value - * @param key - * @return boolean value - * @throws Exception if value is not of type boolean or string or - * if value doesn't exist - */ - public Boolean getBoolean(String key) throws Exception { - return getValue (key, Boolean.class); - } - - /** - * get long value with default value - * @param name - * @param defaultValue - * @return long value - * @throws Exception if value is not of type long or string - */ - public Long getLong(String name, Long defaultValue) - throws Exception { - return getValue(name, defaultValue); - } - - /** - * get long value - * @param name - * @return long value - * @throws Exception if value is not of type long or string or - * if value doesn't exist - */ - public Long getLong(String name) throws Exception { - return getValue (name, Long.class); - } - - /** - * get integer value with default value - * @param name - * @param defaultValue - * @return integer value - * @throws Exception if value is not of type integer or string - */ - public Integer getInt(String name, Integer defaultValue) - throws Exception { - return getValue(name, defaultValue); - } - - /** - * get integer value - * @param name - * @return integer value - * @throws Exception if value is not of type integer or string or - * if value doesn't exist - */ - public Integer getInt(String name) throws Exception { - return getValue (name, Integer.class); - } - - /** - * get double value with default value - * @param name - * @param defaultValue - * @return double value - * @throws Exception if value is not of type double or string - */ - public Double getDouble(String name, double defaultValue) - throws Exception { - return getValue(name, defaultValue); - } - - /** - * get double value - * @param name - * @return double value - * @throws Exception if value is not of type double or string or - * if value doesn't exist - */ - public double getDouble(String name) throws Exception { - return getValue(name, Double.class); - } - - /** - * get URI value with default value - * @param name - * @param defaultValue - * @return URI value - * @throws Exception if value is not of type URI or string - */ - public URI getUri(String name, URI defaultValue) throws Exception { - return getValue(name, defaultValue); - } - - /** - * get URI value - * @param name - * @param defaultValue - * @return URI value - * @throws Exception if value is not of type URI or string - */ - public URI getUri(String name, String defaultValue) - throws Exception { - URI defaultV = new URI(defaultValue); - return getValue(name, defaultV); - } - - /** - * get URI value - * @param name - * @return URI value - * @throws Exception if value is not of type URI or string or - * if value doesn't exist - */ - public URI getUri(String name) throws Exception { - return getValue(name, URI.class); - } - - /** - * compare two props - * @param p - * @return true or false - */ - public boolean equalsProps(Props p) { - if (p == null) { - return false; - } - - final Set<String> myKeySet = getKeySet(); - for (String s : myKeySet) { - if (!get(s).equals(p.get(s))) { - return false; - } - } - - return myKeySet.size() == p.getKeySet().size(); - } - - - /** - * Get a map of all properties by string prefix - * - * @param prefix - * The string prefix - */ - public Map<String, String> getMapByPrefix(String prefix) { - Map<String, String> values = new HashMap<String, String>(); - - for (String key : super.stringPropertyNames()) { - if (key.startsWith(prefix)) { - values.put(key.substring(prefix.length()), super.getProperty(key)); - } - } - return values; - } - - /** - * Store all properties - * - * @param out The stream to write to - * @throws IOException If there is an error writing - */ - public void store(OutputStream out) throws IOException { - super.store(out, null); - } - - /** - * get all property names - * @return set of property names - */ - public Set<String> getKeySet() { - return super.stringPropertyNames(); - } - - /** - * log properties - * @param comment - */ - public void logProperties(String comment) { - logger.info(comment); - - for (String key : getKeySet()) { - logger.info(" key=" + key + " value=" + get(key)); - } - } - - /** - * clone a Props - * @param p - * @return props - */ - public static Props clone(Props p) { - return new Props(p); - } - - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java deleted file mode 100644 index 9278122..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.etl; - -public class UndefinedPropertyException extends RuntimeException { - - private static final long serialVersionUID = 1; - - public UndefinedPropertyException(String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java deleted file mode 100644 index d27a511..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.etl.impl; - - -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.Random; -import kafka.etl.KafkaETLKey; -import kafka.etl.KafkaETLRequest; -import kafka.etl.Props; -import kafka.javaapi.producer.Producer; -import kafka.producer.ProducerConfig; -import kafka.producer.KeyedMessage; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.mapred.JobConf; - -import static org.apache.kafka.common.utils.Utils.formatAddress; - -/** - * Use this class to produce test events to Kafka server. Each event contains a - * random timestamp in text format. - */ -@SuppressWarnings("deprecation") -public class DataGenerator { - - protected final static Random RANDOM = new Random( - System.currentTimeMillis()); - - protected Props _props; - protected Producer _producer = null; - protected URI _uri = null; - protected String _topic; - protected int _count; - protected String _offsetsDir; - protected final int TCP_BUFFER_SIZE = 300 * 1000; - protected final int CONNECT_TIMEOUT = 20000; // ms - protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE; // ms - - public DataGenerator(String id, Props props) throws Exception { - _props = props; - _topic = props.getProperty("kafka.etl.topic"); - System.out.println("topics=" + _topic); - _count = props.getInt("event.count"); - - _offsetsDir = _props.getProperty("input"); - - // initialize kafka producer to generate count events - String serverUri = _props.getProperty("kafka.server.uri"); - _uri = new URI (serverUri); - - System.out.println("server uri:" + _uri.toString()); - Properties producerProps = new Properties(); - producerProps.put("metadata.broker.list", formatAddress(_uri.getHost(), _uri.getPort())); - producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE)); - producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT)); - producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL)); - - _producer = new Producer(new ProducerConfig(producerProps)); - - } - - public void run() throws Exception { - - List<KeyedMessage> list = new ArrayList<KeyedMessage>(); - for (int i = 0; i < _count; i++) { - Long timestamp = RANDOM.nextLong(); - if (timestamp < 0) timestamp = -timestamp; - byte[] bytes = timestamp.toString().getBytes("UTF8"); - list.add(new KeyedMessage<Integer, byte[]>(_topic, null, bytes)); - } - // send events - System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri); - _producer.send(list); - - // close the producer - _producer.close(); - - // generate offset files - generateOffsets(); - } - - protected void generateOffsets() throws Exception { - JobConf conf = new JobConf(); - conf.set("hadoop.job.ugi", _props.getProperty("hadoop.job.ugi")); - conf.setCompressMapOutput(false); - Path outPath = new Path(_offsetsDir + Path.SEPARATOR + "1.dat"); - FileSystem fs = outPath.getFileSystem(conf); - if (fs.exists(outPath)) fs.delete(outPath); - - KafkaETLRequest request = - new KafkaETLRequest(_topic, "tcp://" + formatAddress(_uri.getHost(), _uri.getPort()), 0); - - System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString()); - byte[] bytes = request.toString().getBytes("UTF-8"); - KafkaETLKey dummyKey = new KafkaETLKey(); - SequenceFile.setCompressionType(conf, SequenceFile.CompressionType.NONE); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outPath, - KafkaETLKey.class, BytesWritable.class); - writer.append(dummyKey, new BytesWritable(bytes)); - writer.close(); - } - - public static void main(String[] args) throws Exception { - - if (args.length < 1) - throw new Exception("Usage: - config_file"); - - Props props = new Props(args[0]); - DataGenerator job = new DataGenerator("DataGenerator", props); - job.run(); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java deleted file mode 100644 index d269704..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.etl.impl; - -import kafka.etl.KafkaETLInputFormat; -import kafka.etl.KafkaETLJob; -import kafka.etl.Props; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.TextOutputFormat; - -/** - * This is a simple Kafka ETL job which pull text events generated by - * DataGenerator and store them in hdfs - */ -@SuppressWarnings("deprecation") -public class SimpleKafkaETLJob { - - protected String _name; - protected Props _props; - protected String _input; - protected String _output; - protected String _topic; - - public SimpleKafkaETLJob(String name, Props props) throws Exception { - _name = name; - _props = props; - - _input = _props.getProperty("input"); - _output = _props.getProperty("output"); - - _topic = props.getProperty("kafka.etl.topic"); - } - - - protected JobConf createJobConf() throws Exception { - JobConf jobConf = KafkaETLJob.createJobConf("SimpleKafakETL", _topic, _props, getClass()); - - jobConf.setMapperClass(SimpleKafkaETLMapper.class); - KafkaETLInputFormat.setInputPaths(jobConf, new Path(_input)); - - jobConf.setOutputKeyClass(LongWritable.class); - jobConf.setOutputValueClass(Text.class); - jobConf.setOutputFormat(TextOutputFormat.class); - TextOutputFormat.setCompressOutput(jobConf, false); - Path output = new Path(_output); - FileSystem fs = output.getFileSystem(jobConf); - if (fs.exists(output)) fs.delete(output); - TextOutputFormat.setOutputPath(jobConf, output); - - jobConf.setNumReduceTasks(0); - return jobConf; - } - - public void execute () throws Exception { - JobConf conf = createJobConf(); - RunningJob runningJob = new JobClient(conf).submitJob(conf); - String id = runningJob.getJobID(); - System.out.println("Hadoop job id=" + id); - runningJob.waitForCompletion(); - - if (!runningJob.isSuccessful()) - throw new Exception("Hadoop ETL job failed! Please check status on http://" - + conf.get("mapred.job.tracker") + "/jobdetails.jsp?jobid=" + id); - } - - /** - * for testing only - * - * @param args - * @throws Exception - */ - public static void main(String[] args) throws Exception { - - if (args.length < 1) - throw new Exception("Usage: - config_file"); - - Props props = new Props(args[0]); - SimpleKafkaETLJob job = new SimpleKafkaETLJob("SimpleKafkaETLJob", - props); - job.execute(); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java deleted file mode 100644 index 0fea5db..0000000 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.etl.impl; - -import kafka.etl.KafkaETLKey; -import kafka.etl.KafkaETLUtils; -import kafka.message.Message; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * Simple implementation of KafkaETLMapper. It assumes that - * input data are text timestamp (long). - */ -@SuppressWarnings("deprecation") -public class SimpleKafkaETLMapper implements -Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> { - - protected long _count = 0; - - protected Text getData(Message message) throws IOException { - ByteBuffer buf = message.payload(); - if(buf == null) - return new Text(); - - byte[] array = new byte[buf.limit()]; - buf.get(array); - - Text text = new Text( new String(array, "UTF8")); - return text; - } - - - @Override - public void map(KafkaETLKey key, BytesWritable val, - OutputCollector<LongWritable, Text> collector, - Reporter reporter) throws IOException { - - - byte[] bytes = KafkaETLUtils.getBytes(val); - - //check the checksum of message - Message message = new Message(ByteBuffer.wrap(bytes)); - long checksum = key.getChecksum(); - if (checksum != message.checksum()) - throw new IOException ("Invalid message checksum " - + message.checksum() + ". Expected " + key + "."); - Text data = getData (message); - _count ++; - - collector.collect(new LongWritable (_count), data); - - } - - - @Override - public void configure(JobConf arg0) { - // TODO Auto-generated method stub - - } - - - @Override - public void close() throws IOException { - // TODO Auto-generated method stub - - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e176fcc7/contrib/hadoop-consumer/test/test.properties ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/test/test.properties b/contrib/hadoop-consumer/test/test.properties deleted file mode 100644 index cdea8cc..0000000 --- a/contrib/hadoop-consumer/test/test.properties +++ /dev/null @@ -1,42 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# name of test topic -kafka.etl.topic=SimpleTestEvent - -# hdfs location of jars -hdfs.default.classpath.dir=/tmp/kafka/lib - -# number of test events to be generated -event.count=1000 - -# hadoop id and group -hadoop.job.ugi=kafka,hadoop - -# kafka server uri -kafka.server.uri=tcp://localhost:9092 - -# hdfs location of input directory -input=/tmp/kafka/data - -# hdfs location of output directory -output=/tmp/kafka/output - -# limit the number of events to be fetched; -# value -1 means no limitation -kafka.request.limit=-1 - -# kafka parameters -client.buffer.size=1048576 -client.so.timeout=60000