Github user mbalassi commented on a diff in the pull request:
https://github.com/apache/flink/pull/459#discussion_r26050037
--- Diff:
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+
+/**
+ * Iterates the records received from a partition of a Kafka topic as byte
arrays.
+ */
+public class KafkaConsumerIterator implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 1000L;
+
+ private List<String> hosts;
+ private String topic;
+ private int port;
+ private int partition;
+ private long readOffset;
+ private long waitOnEmptyFetch;
+ private transient SimpleConsumer consumer;
+ private List<String> replicaBrokers;
+ private String clientName;
+
+ private transient Iterator<MessageAndOffset> iter;
+ private transient FetchResponse fetchResponse;
+
+ /**
+ * Constructor with configurable wait time on empty fetch. For
connecting to the Kafka service
+ * we use the so called simple or low level Kafka API thus directly
connecting to one of the brokers.
+ *
+ * @param hostName Hostname of a known Kafka broker
+ * @param port Port of the known Kafka broker
+ * @param topic Name of the topic to listen to
+ * @param partition Partition in the chosen topic
+ * @param waitOnEmptyFetch wait time on empty fetch in millis
+ */
+ public KafkaConsumerIterator(String hostName, int port, String topic,
int partition,
+ long waitOnEmptyFetch) {
+
+ this.hosts = new ArrayList<String>();
+ hosts.add(hostName);
+ this.port = port;
+
+ this.topic = topic;
+ this.partition = partition;
+ this.waitOnEmptyFetch = waitOnEmptyFetch;
+
+ replicaBrokers = new ArrayList<String>();
+ }
+
+ /**
+ * Constructor without configurable wait time on empty fetch. For
connecting to the Kafka service
+ * we use the so called simple or low level Kafka API thus directly
connecting to one of the brokers.
+ *
+ * @param hostName Hostname of a known Kafka broker
+ * @param port Port of the known Kafka broker
+ * @param topic Name of the topic to listen to
+ * @param partition Partition in the chosen topic
+ */
+ public KafkaConsumerIterator(String hostName, int port, String topic,
int partition){
+ this(hostName, port, topic, partition,
DEFAULT_WAIT_ON_EMPTY_FETCH);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Initializing a connection
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Initializes the connection by detecting the leading broker of
+ * the topic and establishing a connection to it.
+ */
+ private void initialize() throws InterruptedException {
+ PartitionMetadata metadata;
+ do {
+ metadata = findLeader(hosts, port, topic, partition);
+ try {
+ Thread.sleep(waitOnEmptyFetch);
+ } catch (InterruptedException e) {
+ throw new InterruptedException("Establishing
connection to Kafka failed");
+ }
+ } while (metadata == null);
+
+ if (metadata.leader() == null) {
+ throw new RuntimeException("Can't find Leader for Topic
and Partition. (at " + hosts.get(0)
+ + ":" + port);
+ }
+
+ String leadBroker = metadata.leader().host();
+ clientName = "Client_" + topic + "_" + partition;
+
+ consumer = new SimpleConsumer(leadBroker, port, 100000, 64 *
1024, clientName);
+ }
+
+ /**
+ * Initializes a connection from the earliest available offset.
+ */
+ public void initializeFromBeginning() throws InterruptedException {
+ initialize();
+ readOffset = getLastOffset(consumer, topic, partition,
+ kafka.api.OffsetRequest.EarliestTime(),
clientName);
+
+ resetFetchResponse(readOffset);
+ }
+
+ /**
+ * Initializes a connection from the latest available offset.
+ */
+ public void initializeFromCurrent() throws InterruptedException {
+ initialize();
+ readOffset = getLastOffset(consumer, topic, partition,
+ kafka.api.OffsetRequest.LatestTime(),
clientName);
+
+ resetFetchResponse(readOffset);
+ }
+
+ /**
+ * Initializes a connection from the specified offset.
+ *
+ * @param offset Desired Kafka offset
+ */
+ public void initializeFromOffset(long offset) throws
InterruptedException {
+ initialize();
+ readOffset = offset;
+ resetFetchResponse(readOffset);
+ }
+
+
+ //
--------------------------------------------------------------------------------------------
+ // Iterator methods
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Convenience method to emulate iterator behaviour.
+ *
+ * @return whether the iterator has a next element
+ */
+ public boolean hasNext() {
+ return true;
+ }
+
+ /**
+ * Returns the next message received from Kafka as a
+ * byte array.
+ *
+ * @return next message as a byte array.
+ */
+ public byte[] next() {
+ return nextWithOffset().getMessage();
+ }
+
+ /**
+ * Returns the next message and its offset received from
+ * Kafka encapsulated in a POJO.
+ *
+ * @return next message and its offset.
+ */
+ public MessageWithOffset nextWithOffset() {
+
+ synchronized (fetchResponse) {
+ while (!iter.hasNext()) {
+ resetFetchResponse(readOffset);
+ try {
+ Thread.sleep(waitOnEmptyFetch);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ MessageAndOffset messageAndOffset = iter.next();
+ long currentOffset = messageAndOffset.offset();
+
+ while (currentOffset < readOffset) {
+ messageAndOffset = iter.next();
+ currentOffset = messageAndOffset.offset();
+ }
+
+ readOffset = messageAndOffset.nextOffset();
+ ByteBuffer payload =
messageAndOffset.message().payload();
+
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ return new MessageWithOffset(messageAndOffset.offset(),
bytes);
+ }
+ }
+
+ /**
+ * Resets the iterator to a given offset.
+ *
+ * @param offset Desired Kafka offset.
+ */
+ public void reset(long offset) {
+ synchronized (fetchResponse) {
+ readOffset = offset;
+ resetFetchResponse(offset);
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Internal utilities
+ //
--------------------------------------------------------------------------------------------
+
+ private void resetFetchResponse(long offset) {
+ FetchRequest req = new
FetchRequestBuilder().clientId(clientName)
+ .addFetch(topic, partition, offset,
100000).build();
+ System.out.println(clientName + " " + topic + " " + partition);
--- End diff --
Yes, I forgot it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---