Repository: crunch Updated Branches: refs/heads/master 5609b0143 -> 8e5c2ad37
CRUNCH-654: KafkaSource should use the new Kafka Consumer API instead of the SimpleConsumer. Contributed by Bryan Baugher. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8e5c2ad3 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8e5c2ad3 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8e5c2ad3 Branch: refs/heads/master Commit: 8e5c2ad37c6e2f0fbc428bddf69da3644d535456 Parents: 5609b01 Author: Josh Wills <[email protected]> Authored: Mon Dec 11 09:56:38 2017 -0800 Committer: Josh Wills <[email protected]> Committed: Mon Dec 11 09:58:17 2017 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/kafka/KafkaUtils.java | 22 +++++++++++++++----- .../crunch/kafka/record/KafkaRecordReader.java | 8 +++---- .../org/apache/crunch/kafka/KafkaUtilsIT.java | 12 +++++++++-- .../kafka/utils/KafkaBrokerTestHarness.java | 13 ++++++++++-- pom.xml | 2 +- 5 files changed, 42 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java index f3da5e9..2ed6412 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java @@ -33,11 +33,13 @@ import org.apache.commons.lang.StringUtils; import org.apache.crunch.CrunchRuntimeException; import org.apache.hadoop.conf.Configuration; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.SecurityProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConversions; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -147,7 +149,9 @@ public class KafkaUtils { * the topics are {@code null}, empty or blank, or if there is an error parsing the * properties. * @throws IllegalStateException if there is an error communicating with the Kafka cluster to retrieve information. + * @deprecated As of 1.0. Use beginning/end offset APIs on {@link org.apache.kafka.clients.consumer.Consumer} */ + @Deprecated public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long time, String... topics) { if (properties == null) throw new IllegalArgumentException("properties cannot be null"); @@ -179,7 +183,7 @@ public class KafkaUtils { topicMetadataResponse = consumer.send(topicMetadataRequest); break; } catch (Exception err) { - EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get(); + EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0); LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed", Arrays.toString(topics), endpoint.host()), err); } finally { @@ -209,7 +213,12 @@ public class KafkaUtils { throw new CrunchRuntimeException("Unable to find leader for topic:"+metadata.topic() +" partition:"+partition.partitionId()); } - Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT); + + EndPoint endPoint = new EndPoint(brokerEndPoint.host(), brokerEndPoint.port(), + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT); + + Broker leader = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), + Option.<String>empty()); if (brokerRequests.containsKey(leader)) requestInfo = brokerRequests.get(leader); @@ -279,7 +288,7 @@ public class KafkaUtils { */ private static SimpleConsumer getSimpleConsumer(final Broker broker) { // BrokerHost, BrokerPort, timeout, buffer size, client id - EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get(); + EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0); return new SimpleConsumer(endpoint.host(), endpoint.port(), 100000, 64 * 1024, CLIENT_ID); } @@ -309,7 +318,10 @@ public class KafkaUtils { throw new IllegalArgumentException("Unable to parse host/port from broker string : [" + Arrays.toString(brokerHostPort) + "] from broker list : [" + Arrays.toString(brokerPortList) + "]"); try { - brokers.add(new Broker(0, brokerHostPort[0], Integer.parseInt(brokerHostPort[1]), SecurityProtocol.PLAINTEXT)); + EndPoint endPoint = new EndPoint(brokerHostPort[0], Integer.parseInt(brokerHostPort[1]), + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT); + brokers.add(new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), + Option.<String>empty())); } catch (NumberFormatException e) { throw new IllegalArgumentException("Error parsing broker port : " + brokerHostPort[1], e); } http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java index ec138b3..ef2ec49 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java @@ -17,9 +17,7 @@ */ package org.apache.crunch.kafka.record; -import kafka.api.OffsetRequest; import org.apache.crunch.CrunchRuntimeException; -import org.apache.crunch.kafka.KafkaUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -214,10 +212,10 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>, * @return the earliest offset of the topic partition */ protected long getEarliestOffset() { - Map<TopicPartition, Long> brokerOffsets = KafkaUtils - .getBrokerOffsets(kafkaConnectionProperties, OffsetRequest.EarliestTime(), topicPartition.topic()); + Map<TopicPartition, Long> brokerOffsets = consumer.beginningOffsets( + Collections.singletonList(topicPartition)); Long offset = brokerOffsets.get(topicPartition); - if (offset == null) { + if(offset == null){ LOG.debug("Unable to determine earliest offset for {} so returning 0", topicPartition); return 0L; } http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java index 38c3fce..dc4ea82 100644 --- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java @@ -18,11 +18,13 @@ package org.apache.crunch.kafka; import kafka.cluster.Broker; +import kafka.cluster.EndPoint; import org.apache.crunch.kafka.ClusterTest; import org.apache.crunch.kafka.KafkaUtils; import org.apache.hadoop.conf.Configuration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.SecurityProtocol; import org.junit.AfterClass; import org.junit.Before; @@ -30,6 +32,8 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import scala.Option; +import scala.collection.JavaConversions; import java.io.IOException; import java.util.Arrays; @@ -63,7 +67,9 @@ public class KafkaUtilsIT { String brokerHost = brokerHostPort[0]; int brokerPort = Integer.parseInt(brokerHostPort[1]); - broker = new Broker(0, brokerHost, brokerPort, SecurityProtocol.PLAINTEXT); + EndPoint endPoint = new EndPoint(brokerHost, brokerPort, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT); + broker = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty()); } @AfterClass @@ -181,7 +187,9 @@ public class KafkaUtilsIT { @Test public void getBrokerOffsetsSomeHostsUnavailable() throws IOException { - final Broker bad = new Broker(0, "dummyBrokerHost1", 0, SecurityProtocol.PLAINTEXT); + EndPoint endPoint = new EndPoint("dummyBrokerHost1", 0, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT); + final Broker bad = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty()); assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(broker, bad), kafka.api.OffsetRequest.LatestTime(), topic)); assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(bad, broker), kafka.api.OffsetRequest.LatestTime(), topic)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java index f47f168..13a4e2c 100644 --- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java @@ -17,17 +17,20 @@ package org.apache.crunch.kafka.utils; +import kafka.metrics.KafkaMetricsReporter; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; -import kafka.utils.Time; import org.apache.commons.io.FileUtils; +import org.apache.kafka.common.utils.Time; import scala.Option; +import scala.collection.JavaConversions; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -341,7 +344,8 @@ public class KafkaBrokerTestHarness extends ZookeeperTestHarness { } private static KafkaServer startBroker(KafkaConfig config) { - KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty()); + KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty(), + JavaConversions.asScalaBuffer(Collections.<KafkaMetricsReporter>emptyList())); server.startup(); return server; } @@ -353,6 +357,11 @@ public class KafkaBrokerTestHarness extends ZookeeperTestHarness { } @Override + public long hiResClockMs() { + return System.currentTimeMillis(); + } + + @Override public long nanoseconds() { return System.nanoTime(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 248aa05..9af5374 100644 --- a/pom.xml +++ b/pom.xml @@ -105,7 +105,7 @@ under the License. <avro.classifier>hadoop2</avro.classifier> <hive.version>2.1.0</hive.version> - <kafka.version>0.10.0.1</kafka.version> + <kafka.version>0.10.2.1</kafka.version> <scala.base.version>2.11</scala.base.version> <scala.version>2.11.8</scala.version> <scalatest.version>2.2.4</scalatest.version>
