Repository: hive
Updated Branches:
  refs/heads/master 366eaceff -> beaa8a8c3


HIVE-20561: Use the position of the Kafka Consumer to track progress instead of 
Consumer Records offsets (Slim Bouguerra, reviewed by Vineet Garg)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/beaa8a8c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/beaa8a8c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/beaa8a8c

Branch: refs/heads/master
Commit: beaa8a8c3e041f026d1cf5b85d120a5709db73b0
Parents: 366eace
Author: Slim Bouguerra <slim.bougue...@gmail.com>
Authored: Tue Sep 18 10:54:52 2018 -0700
Committer: Vineet Garg <vg...@apache.org>
Committed: Tue Sep 18 10:56:09 2018 -0700

----------------------------------------------------------------------
 .../hive/kafka/KafkaPullerRecordReader.java     |  15 +-
 .../hadoop/hive/kafka/KafkaRecordIterator.java  | 195 +++++++++----------
 .../hive/kafka/KafkaRecordIteratorTest.java     |  20 +-
 3 files changed, 121 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/beaa8a8c/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
index 4f0ee94..06a10b4 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
@@ -89,7 +89,7 @@ import java.util.Properties;
       LOG.debug("Consumer poll timeout [{}] ms", pollTimeout);
       this.recordsCursor =
           startOffset == endOffset ?
-              new KafkaRecordIterator.EmptyIterator() :
+              new EmptyIterator() :
               new KafkaRecordIterator(consumer, topicPartition, startOffset, 
endOffset, pollTimeout);
       started = true;
     }
@@ -157,4 +157,17 @@ import java.util.Properties;
       consumer.close();
     }
   }
+
+  /**
+   * Empty iterator for empty splits when startOffset == endOffset, this is 
added to avoid clumsy if condition.
+   */
+  private static final class EmptyIterator implements 
Iterator<ConsumerRecord<byte[], byte[]>> {
+    @Override public boolean hasNext() {
+      return false;
+    }
+
+    @Override public ConsumerRecord<byte[], byte[]> next() {
+      throw new IllegalStateException("this is an empty iterator");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/beaa8a8c/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
index 7daa3e2..c252455 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
@@ -18,14 +18,13 @@
 
 package org.apache.hadoop.hive.kafka;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,108 +35,120 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Iterator over Kafka Records to read records from a single topic partition 
inclusive start exclusive end.
- * <p>
- * If {@code startOffset} is not null will seek up to that offset
- * Else If {@code startOffset} is null will seek to beginning see
- * {@link 
org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)}
- * <p>
- * When provided with an end offset it will return records up to the record 
with offset == endOffset - 1,
- * Else If end offsets is null it will read up to the current end see
- * {@link 
org.apache.kafka.clients.consumer.Consumer#endOffsets(java.util.Collection)}
- * <p>
- * Current implementation of this Iterator will throw and exception if can not 
poll up to the endOffset - 1
+ * Iterator over Kafka Records to read records from a single topic partition 
inclusive start, exclusive end.
  */
 public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], 
byte[]>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaRecordIterator.class);
+  private static final String
+      POLL_TIMEOUT_HINT =
+      String.format("Try increasing poll timeout using Hive Table property 
[%s]",
+          KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT);
+  private static final String
+      ERROR_POLL_TIMEOUT_FORMAT =
+      "Consumer returned [0] record due to exhausted poll timeout [%s]ms from 
TopicPartition:[%s] "
+          + "start Offset [%s], current consumer position [%s], target end 
offset [%s], "
+          + POLL_TIMEOUT_HINT;
 
   private final Consumer<byte[], byte[]> consumer;
   private final TopicPartition topicPartition;
-  private long endOffset;
-  private long startOffset;
+  private final long endOffset;
+  private final long startOffset;
   private final long pollTimeoutMs;
   private final Stopwatch stopwatch = Stopwatch.createUnstarted();
   private ConsumerRecords<byte[], byte[]> records;
-  private long currentOffset;
+  private long consumerPosition;
   private ConsumerRecord<byte[], byte[]> nextRecord;
   private boolean hasMore = true;
-  private final boolean started;
-
-  //Kafka consumer poll method return an iterator of records.
   private Iterator<ConsumerRecord<byte[], byte[]>> consumerRecordIterator = 
null;
 
   /**
-   * @param consumer       functional kafka consumer
-   * @param topicPartition kafka topic partition
-   * @param startOffset    start position of stream.
-   * @param endOffset      requested end position. If null will read up to 
current last
-   * @param pollTimeoutMs  poll time out in ms
+   * Iterator over Kafka Records over a single {@code topicPartition} 
inclusive {@code startOffset},
+   * up to exclusive {@code endOffset}.
+   * <p>
+   * If {@code requestedStartOffset} is not null will seek up to that offset
+   * Else If {@code requestedStartOffset} is null will seek to beginning see
+   * {@link 
org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)}
+   * <p>
+   * When provided with {@code requestedEndOffset}, will return records up to 
consumer position == endOffset
+   * Else If {@code requestedEndOffset} is null it will read up to the current 
end of the stream
+   * {@link 
org.apache.kafka.clients.consumer.Consumer#seekToEnd(java.util.Collection)}
+   *  <p>
+   * @param consumer       functional kafka consumer.
+   * @param topicPartition kafka topic partition.
+   * @param requestedStartOffset    requested start position.
+   * @param requestedEndOffset      requested end position. If null will read 
up to current last
+   * @param pollTimeoutMs  poll time out in ms.
    */
   KafkaRecordIterator(Consumer<byte[], byte[]> consumer,
       TopicPartition topicPartition,
-      @Nullable Long startOffset,
-      @Nullable Long endOffset,
+      @Nullable Long requestedStartOffset,
+      @Nullable Long requestedEndOffset,
       long pollTimeoutMs) {
     this.consumer = Preconditions.checkNotNull(consumer, "Consumer can not be 
null");
     this.topicPartition = Preconditions.checkNotNull(topicPartition, "Topic 
partition can not be null");
     this.pollTimeoutMs = pollTimeoutMs;
-    Preconditions.checkState(this.pollTimeoutMs > 0, "poll timeout has to be 
positive number");
-    this.startOffset = startOffset == null ? -1L : startOffset;
-    this.endOffset = endOffset == null ? -1L : endOffset;
-    assignAndSeek();
-    this.started = true;
-  }
-
-  KafkaRecordIterator(Consumer<byte[], byte[]> consumer, TopicPartition tp, 
long pollTimeoutMs) {
-    this(consumer, tp, null, null, pollTimeoutMs);
-  }
-
-  private void assignAndSeek() {
+    Preconditions.checkState(this.pollTimeoutMs > 0, "Poll timeout has to be 
positive number");
+    final List<TopicPartition> topicPartitionList = 
Collections.singletonList(topicPartition);
     // assign topic partition to consumer
-    final List<TopicPartition> topicPartitionList = 
ImmutableList.of(topicPartition);
-    if (LOG.isTraceEnabled()) {
-      stopwatch.reset().start();
+    consumer.assign(topicPartitionList);
+
+    // do to End Offset first in case of we have to seek to end to figure out 
the last available offset
+    if (requestedEndOffset == null) {
+      consumer.seekToEnd(topicPartitionList);
+      this.endOffset = consumer.position(topicPartition);
+      LOG.info("End Offset set to [{}]", this.endOffset);
+    } else {
+      this.endOffset = requestedEndOffset;
     }
 
-    consumer.assign(topicPartitionList);
-    // compute offsets and seek to start
-    if (startOffset > -1) {
-      LOG.info("Seeking to offset [{}] of topic partition [{}]", startOffset, 
topicPartition);
-      consumer.seek(topicPartition, startOffset);
+    // seek to start offsets
+    if (requestedStartOffset != null) {
+      LOG.info("Seeking to offset [{}] of topic partition [{}]", 
requestedStartOffset, topicPartition);
+      consumer.seek(topicPartition, requestedStartOffset);
+      this.startOffset = consumer.position(topicPartition);
+      if (this.startOffset != requestedStartOffset) {
+        LOG.warn("Current Start Offset [{}] is different form the requested 
start position [{}]",
+            this.startOffset,
+            requestedStartOffset);
+      }
     } else {
-      LOG.info("Seeking to beginning of topic partition [{}]", topicPartition);
+      // case seek to beginning of stream
+      consumer.seekToBeginning(Collections.singleton(topicPartition));
       // seekToBeginning is lazy thus need to call position() or poll(0)
-      this.consumer.seekToBeginning(Collections.singleton(topicPartition));
-      startOffset = consumer.position(topicPartition);
-    }
-    if (endOffset == -1) {
-      this.endOffset = 
consumer.endOffsets(topicPartitionList).get(topicPartition);
-      LOG.info("EndOffset set to {}", endOffset);
+      this.startOffset = consumer.position(topicPartition);
+      LOG.info("Consumer at beginning of topic partition [{}], current start 
offset [{}]",
+          topicPartition,
+          this.startOffset);
     }
-    currentOffset = consumer.position(topicPartition);
-    Preconditions.checkState(this.endOffset >= currentOffset,
-        "End offset [%s] need to be greater than start offset [%s]",
+
+    consumerPosition = consumer.position(topicPartition);
+    Preconditions.checkState(this.endOffset >= consumerPosition,
+        "End offset [%s] need to be greater or equal than start offset [%s]",
         this.endOffset,
-        currentOffset);
-    LOG.info("Kafka Iterator ready, assigned TopicPartition [{}]; startOffset 
[{}]; endOffset [{}]",
+        consumerPosition);
+    LOG.info("Kafka Iterator assigned to TopicPartition [{}]; start Offset 
[{}]; end Offset [{}]",
         topicPartition,
-        currentOffset,
+        consumerPosition,
         this.endOffset);
-    if (LOG.isTraceEnabled()) {
-      stopwatch.stop();
-      LOG.trace("Time to assign and seek [{}] ms", 
stopwatch.elapsed(TimeUnit.MILLISECONDS));
-    }
+
   }
 
-  @Override
-  public boolean hasNext() {
+  @VisibleForTesting KafkaRecordIterator(Consumer<byte[], byte[]> consumer, 
TopicPartition tp, long pollTimeoutMs) {
+    this(consumer, tp, null, null, pollTimeoutMs);
+  }
+
+  /**
+   * @throws IllegalStateException if the kafka consumer poll call can not 
reach the target offset.
+   * @return true if has more records to be consumed.
+   */
+  @Override public boolean hasNext() {
     /*
     Poll more records from Kafka queue IF:
-    Initial poll case -> (records == null)
+    Initial poll -> (records == null)
       OR
-    Need to poll at least one more record (currentOffset + 1 < endOffset) AND 
consumerRecordIterator is empty (!hasMore)
+    Need to poll at least one more record (consumerPosition < endOffset) AND 
consumerRecordIterator is empty (!hasMore)
     */
-    if (!hasMore && currentOffset + 1 < endOffset || records == null) {
+    if (!hasMore && consumerPosition < endOffset || records == null) {
       pollRecords();
       findNext();
     }
@@ -145,65 +156,49 @@ public class KafkaRecordIterator implements 
Iterator<ConsumerRecord<byte[], byte
   }
 
   /**
-   * Poll more records or Fail with {@link TimeoutException} if no records 
returned before reaching target end offset.
+   * Poll more records from the Kafka Broker.
+   *
+   * @throws IllegalStateException if no records returned before consumer 
position reaches target end offset.
    */
   private void pollRecords() {
     if (LOG.isTraceEnabled()) {
       stopwatch.reset().start();
     }
-    Preconditions.checkArgument(started);
     records = consumer.poll(pollTimeoutMs);
     if (LOG.isTraceEnabled()) {
       stopwatch.stop();
       LOG.trace("Pulled [{}] records in [{}] ms", records.count(), 
stopwatch.elapsed(TimeUnit.MILLISECONDS));
     }
     // Fail if we can not poll within one lap of pollTimeoutMs.
-    if (records.isEmpty() && currentOffset < endOffset) {
-      throw new TimeoutException(String.format("Current offset: 
[%s]-TopicPartition:[%s], target End offset:[%s]."
-              + "Consumer returned 0 record due to exhausted poll timeout 
[%s]ms, try increasing[%s]",
-          currentOffset,
-          topicPartition.toString(),
-          endOffset,
+    if (records.isEmpty() && consumer.position(topicPartition) < endOffset) {
+      throw new IllegalStateException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
           pollTimeoutMs,
-          KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT));
+          topicPartition.toString(),
+          startOffset,
+          consumer.position(topicPartition),
+          endOffset));
     }
     consumerRecordIterator = records.iterator();
+    consumerPosition = consumer.position(topicPartition);
   }
 
   @Override public ConsumerRecord<byte[], byte[]> next() {
     ConsumerRecord<byte[], byte[]> value = nextRecord;
     Preconditions.checkState(value.offset() < endOffset);
     findNext();
-    return Preconditions.checkNotNull(value);
+    return value;
   }
 
   /**
-   * Find the next element in the batch of returned records by previous poll 
or set hasMore to false tp poll more next
-   * call to {@link KafkaRecordIterator#hasNext()}.
+   * Find the next element in the current batch OR schedule {@link 
KafkaRecordIterator#pollRecords()} (hasMore = false).
    */
   private void findNext() {
     if (consumerRecordIterator.hasNext()) {
       nextRecord = consumerRecordIterator.next();
-      hasMore = true;
-      if (nextRecord.offset() < endOffset) {
-        currentOffset = nextRecord.offset();
-        return;
-      }
-    }
-    hasMore = false;
-    nextRecord = null;
-  }
-
-  /**
-   * Empty iterator for empty splits when startOffset == endOffset, this is 
added to avoid clumsy if condition.
-   */
-  protected static final class EmptyIterator implements 
Iterator<ConsumerRecord<byte[], byte[]>>  {
-    @Override public boolean hasNext() {
-      return false;
-    }
-
-    @Override public ConsumerRecord<byte[], byte[]> next() {
-      throw new IllegalStateException("this is an empty iterator");
+      hasMore = nextRecord.offset() < endOffset;
+    } else {
+      hasMore = false;
+      nextRecord = null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/beaa8a8c/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
index 98a5568..e048fb3 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
@@ -38,7 +38,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Time;
 import org.junit.After;
@@ -192,9 +191,13 @@ public class KafkaRecordIteratorTest {
     recordReader.close();
   }
 
-  @Test(expected = TimeoutException.class) public void 
testPullingBeyondLimit() {
-    this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, 
TOPIC_PARTITION, 0L, 101L, POLL_TIMEOUT_MS);
-    this.compareIterator(RECORDS, this.kafkaRecordIterator);
+  @Test(expected = IllegalStateException.class) public void 
testPullingBeyondLimit() {
+    //FYI In the Tx world Commits can introduce offset gaps therefore
+    //this (RECORD_NUMBER + 1) as beyond limit offset is only true if the 
topic has not Tx or any Control msg.
+    this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, 
TOPIC_PARTITION, 19383L, (long) RECORD_NUMBER + 1, POLL_TIMEOUT_MS);
+    this.compareIterator(RECORDS.stream()
+        .filter((consumerRecord) -> consumerRecord.offset() >= 19383L)
+        .collect(Collectors.toList()), this.kafkaRecordIterator);
   }
 
   @Test(expected = IllegalStateException.class) public void 
testPullingStartGreaterThanEnd() {
@@ -202,13 +205,13 @@ public class KafkaRecordIteratorTest {
     this.compareIterator(RECORDS, this.kafkaRecordIterator);
   }
 
-  @Test(expected = TimeoutException.class) public void 
testPullingFromEmptyTopic() {
+  @Test(expected = IllegalStateException.class) public void 
testPullingFromEmptyTopic() {
     this.kafkaRecordIterator =
         new KafkaRecordIterator(this.consumer, new TopicPartition("noHere", 
0), 0L, 100L, POLL_TIMEOUT_MS);
     this.compareIterator(RECORDS, this.kafkaRecordIterator);
   }
 
-  @Test(expected = TimeoutException.class) public void 
testPullingFromEmptyPartition() {
+  @Test(expected = IllegalStateException.class) public void 
testPullingFromEmptyPartition() {
     this.kafkaRecordIterator =
         new KafkaRecordIterator(this.consumer, new TopicPartition(TOPIC, 1), 
0L, 100L, POLL_TIMEOUT_MS);
     this.compareIterator(RECORDS, this.kafkaRecordIterator);
@@ -268,18 +271,19 @@ public class KafkaRecordIteratorTest {
     consumerProps.setProperty("enable.auto.commit", "false");
     consumerProps.setProperty("auto.offset.reset", "none");
     consumerProps.setProperty("bootstrap.servers", "127.0.0.1:9092");
-    this.conf.set("kafka.bootstrap.servers", "127.0.0.1:9092");
+    conf.set("kafka.bootstrap.servers", "127.0.0.1:9092");
     consumerProps.setProperty("key.deserializer", 
ByteArrayDeserializer.class.getName());
     consumerProps.setProperty("value.deserializer", 
ByteArrayDeserializer.class.getName());
     consumerProps.setProperty("request.timeout.ms", "3002");
     consumerProps.setProperty("fetch.max.wait.ms", "3001");
     consumerProps.setProperty("session.timeout.ms", "3001");
     consumerProps.setProperty("metadata.max.age.ms", "100");
+    consumerProps.setProperty("max.poll.records", String.valueOf(RECORD_NUMBER 
- 1));
     this.consumer = new KafkaConsumer<>(consumerProps);
   }
 
   private static void sendData() {
-    LOG.info("Sending {} records", RECORD_NUMBER);
+    LOG.info("Sending [{}] records", RECORDS.size());
     RECORDS.stream()
         .map(consumerRecord -> new ProducerRecord<>(consumerRecord.topic(),
             consumerRecord.partition(),

Reply via email to