Repository: hive Updated Branches: refs/heads/master 366eaceff -> beaa8a8c3
HIVE-20561: Use the position of the Kafka Consumer to track progress instead of Consumer Records offsets (Slim Bouguerra, reviewed by Vineet Garg) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/beaa8a8c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/beaa8a8c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/beaa8a8c Branch: refs/heads/master Commit: beaa8a8c3e041f026d1cf5b85d120a5709db73b0 Parents: 366eace Author: Slim Bouguerra <slim.bougue...@gmail.com> Authored: Tue Sep 18 10:54:52 2018 -0700 Committer: Vineet Garg <vg...@apache.org> Committed: Tue Sep 18 10:56:09 2018 -0700 ---------------------------------------------------------------------- .../hive/kafka/KafkaPullerRecordReader.java | 15 +- .../hadoop/hive/kafka/KafkaRecordIterator.java | 195 +++++++++---------- .../hive/kafka/KafkaRecordIteratorTest.java | 20 +- 3 files changed, 121 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/beaa8a8c/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java ---------------------------------------------------------------------- diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java index 4f0ee94..06a10b4 100644 --- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java +++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java @@ -89,7 +89,7 @@ import java.util.Properties; LOG.debug("Consumer poll timeout [{}] ms", pollTimeout); this.recordsCursor = startOffset == endOffset ? - new KafkaRecordIterator.EmptyIterator() : + new EmptyIterator() : new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout); started = true; } @@ -157,4 +157,17 @@ import java.util.Properties; consumer.close(); } } + + /** + * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition. + */ + private static final class EmptyIterator implements Iterator<ConsumerRecord<byte[], byte[]>> { + @Override public boolean hasNext() { + return false; + } + + @Override public ConsumerRecord<byte[], byte[]> next() { + throw new IllegalStateException("this is an empty iterator"); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/beaa8a8c/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java ---------------------------------------------------------------------- diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java index 7daa3e2..c252455 100644 --- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java +++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java @@ -18,14 +18,13 @@ package org.apache.hadoop.hive.kafka; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,108 +35,120 @@ import java.util.List; import java.util.concurrent.TimeUnit; /** - * Iterator over Kafka Records to read records from a single topic partition inclusive start exclusive end. - * <p> - * If {@code startOffset} is not null will seek up to that offset - * Else If {@code startOffset} is null will seek to beginning see - * {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)} - * <p> - * When provided with an end offset it will return records up to the record with offset == endOffset - 1, - * Else If end offsets is null it will read up to the current end see - * {@link org.apache.kafka.clients.consumer.Consumer#endOffsets(java.util.Collection)} - * <p> - * Current implementation of this Iterator will throw and exception if can not poll up to the endOffset - 1 + * Iterator over Kafka Records to read records from a single topic partition inclusive start, exclusive end. */ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> { private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIterator.class); + private static final String + POLL_TIMEOUT_HINT = + String.format("Try increasing poll timeout using Hive Table property [%s]", + KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT); + private static final String + ERROR_POLL_TIMEOUT_FORMAT = + "Consumer returned [0] record due to exhausted poll timeout [%s]ms from TopicPartition:[%s] " + + "start Offset [%s], current consumer position [%s], target end offset [%s], " + + POLL_TIMEOUT_HINT; private final Consumer<byte[], byte[]> consumer; private final TopicPartition topicPartition; - private long endOffset; - private long startOffset; + private final long endOffset; + private final long startOffset; private final long pollTimeoutMs; private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private ConsumerRecords<byte[], byte[]> records; - private long currentOffset; + private long consumerPosition; private ConsumerRecord<byte[], byte[]> nextRecord; private boolean hasMore = true; - private final boolean started; - - //Kafka consumer poll method return an iterator of records. private Iterator<ConsumerRecord<byte[], byte[]>> consumerRecordIterator = null; /** - * @param consumer functional kafka consumer - * @param topicPartition kafka topic partition - * @param startOffset start position of stream. - * @param endOffset requested end position. If null will read up to current last - * @param pollTimeoutMs poll time out in ms + * Iterator over Kafka Records over a single {@code topicPartition} inclusive {@code startOffset}, + * up to exclusive {@code endOffset}. + * <p> + * If {@code requestedStartOffset} is not null will seek up to that offset + * Else If {@code requestedStartOffset} is null will seek to beginning see + * {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)} + * <p> + * When provided with {@code requestedEndOffset}, will return records up to consumer position == endOffset + * Else If {@code requestedEndOffset} is null it will read up to the current end of the stream + * {@link org.apache.kafka.clients.consumer.Consumer#seekToEnd(java.util.Collection)} + * <p> + * @param consumer functional kafka consumer. + * @param topicPartition kafka topic partition. + * @param requestedStartOffset requested start position. + * @param requestedEndOffset requested end position. If null will read up to current last + * @param pollTimeoutMs poll time out in ms. */ KafkaRecordIterator(Consumer<byte[], byte[]> consumer, TopicPartition topicPartition, - @Nullable Long startOffset, - @Nullable Long endOffset, + @Nullable Long requestedStartOffset, + @Nullable Long requestedEndOffset, long pollTimeoutMs) { this.consumer = Preconditions.checkNotNull(consumer, "Consumer can not be null"); this.topicPartition = Preconditions.checkNotNull(topicPartition, "Topic partition can not be null"); this.pollTimeoutMs = pollTimeoutMs; - Preconditions.checkState(this.pollTimeoutMs > 0, "poll timeout has to be positive number"); - this.startOffset = startOffset == null ? -1L : startOffset; - this.endOffset = endOffset == null ? -1L : endOffset; - assignAndSeek(); - this.started = true; - } - - KafkaRecordIterator(Consumer<byte[], byte[]> consumer, TopicPartition tp, long pollTimeoutMs) { - this(consumer, tp, null, null, pollTimeoutMs); - } - - private void assignAndSeek() { + Preconditions.checkState(this.pollTimeoutMs > 0, "Poll timeout has to be positive number"); + final List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition); // assign topic partition to consumer - final List<TopicPartition> topicPartitionList = ImmutableList.of(topicPartition); - if (LOG.isTraceEnabled()) { - stopwatch.reset().start(); + consumer.assign(topicPartitionList); + + // do to End Offset first in case of we have to seek to end to figure out the last available offset + if (requestedEndOffset == null) { + consumer.seekToEnd(topicPartitionList); + this.endOffset = consumer.position(topicPartition); + LOG.info("End Offset set to [{}]", this.endOffset); + } else { + this.endOffset = requestedEndOffset; } - consumer.assign(topicPartitionList); - // compute offsets and seek to start - if (startOffset > -1) { - LOG.info("Seeking to offset [{}] of topic partition [{}]", startOffset, topicPartition); - consumer.seek(topicPartition, startOffset); + // seek to start offsets + if (requestedStartOffset != null) { + LOG.info("Seeking to offset [{}] of topic partition [{}]", requestedStartOffset, topicPartition); + consumer.seek(topicPartition, requestedStartOffset); + this.startOffset = consumer.position(topicPartition); + if (this.startOffset != requestedStartOffset) { + LOG.warn("Current Start Offset [{}] is different form the requested start position [{}]", + this.startOffset, + requestedStartOffset); + } } else { - LOG.info("Seeking to beginning of topic partition [{}]", topicPartition); + // case seek to beginning of stream + consumer.seekToBeginning(Collections.singleton(topicPartition)); // seekToBeginning is lazy thus need to call position() or poll(0) - this.consumer.seekToBeginning(Collections.singleton(topicPartition)); - startOffset = consumer.position(topicPartition); - } - if (endOffset == -1) { - this.endOffset = consumer.endOffsets(topicPartitionList).get(topicPartition); - LOG.info("EndOffset set to {}", endOffset); + this.startOffset = consumer.position(topicPartition); + LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]", + topicPartition, + this.startOffset); } - currentOffset = consumer.position(topicPartition); - Preconditions.checkState(this.endOffset >= currentOffset, - "End offset [%s] need to be greater than start offset [%s]", + + consumerPosition = consumer.position(topicPartition); + Preconditions.checkState(this.endOffset >= consumerPosition, + "End offset [%s] need to be greater or equal than start offset [%s]", this.endOffset, - currentOffset); - LOG.info("Kafka Iterator ready, assigned TopicPartition [{}]; startOffset [{}]; endOffset [{}]", + consumerPosition); + LOG.info("Kafka Iterator assigned to TopicPartition [{}]; start Offset [{}]; end Offset [{}]", topicPartition, - currentOffset, + consumerPosition, this.endOffset); - if (LOG.isTraceEnabled()) { - stopwatch.stop(); - LOG.trace("Time to assign and seek [{}] ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); - } + } - @Override - public boolean hasNext() { + @VisibleForTesting KafkaRecordIterator(Consumer<byte[], byte[]> consumer, TopicPartition tp, long pollTimeoutMs) { + this(consumer, tp, null, null, pollTimeoutMs); + } + + /** + * @throws IllegalStateException if the kafka consumer poll call can not reach the target offset. + * @return true if has more records to be consumed. + */ + @Override public boolean hasNext() { /* Poll more records from Kafka queue IF: - Initial poll case -> (records == null) + Initial poll -> (records == null) OR - Need to poll at least one more record (currentOffset + 1 < endOffset) AND consumerRecordIterator is empty (!hasMore) + Need to poll at least one more record (consumerPosition < endOffset) AND consumerRecordIterator is empty (!hasMore) */ - if (!hasMore && currentOffset + 1 < endOffset || records == null) { + if (!hasMore && consumerPosition < endOffset || records == null) { pollRecords(); findNext(); } @@ -145,65 +156,49 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte } /** - * Poll more records or Fail with {@link TimeoutException} if no records returned before reaching target end offset. + * Poll more records from the Kafka Broker. + * + * @throws IllegalStateException if no records returned before consumer position reaches target end offset. */ private void pollRecords() { if (LOG.isTraceEnabled()) { stopwatch.reset().start(); } - Preconditions.checkArgument(started); records = consumer.poll(pollTimeoutMs); if (LOG.isTraceEnabled()) { stopwatch.stop(); LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); } // Fail if we can not poll within one lap of pollTimeoutMs. - if (records.isEmpty() && currentOffset < endOffset) { - throw new TimeoutException(String.format("Current offset: [%s]-TopicPartition:[%s], target End offset:[%s]." - + "Consumer returned 0 record due to exhausted poll timeout [%s]ms, try increasing[%s]", - currentOffset, - topicPartition.toString(), - endOffset, + if (records.isEmpty() && consumer.position(topicPartition) < endOffset) { + throw new IllegalStateException(String.format(ERROR_POLL_TIMEOUT_FORMAT, pollTimeoutMs, - KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT)); + topicPartition.toString(), + startOffset, + consumer.position(topicPartition), + endOffset)); } consumerRecordIterator = records.iterator(); + consumerPosition = consumer.position(topicPartition); } @Override public ConsumerRecord<byte[], byte[]> next() { ConsumerRecord<byte[], byte[]> value = nextRecord; Preconditions.checkState(value.offset() < endOffset); findNext(); - return Preconditions.checkNotNull(value); + return value; } /** - * Find the next element in the batch of returned records by previous poll or set hasMore to false tp poll more next - * call to {@link KafkaRecordIterator#hasNext()}. + * Find the next element in the current batch OR schedule {@link KafkaRecordIterator#pollRecords()} (hasMore = false). */ private void findNext() { if (consumerRecordIterator.hasNext()) { nextRecord = consumerRecordIterator.next(); - hasMore = true; - if (nextRecord.offset() < endOffset) { - currentOffset = nextRecord.offset(); - return; - } - } - hasMore = false; - nextRecord = null; - } - - /** - * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition. - */ - protected static final class EmptyIterator implements Iterator<ConsumerRecord<byte[], byte[]>> { - @Override public boolean hasNext() { - return false; - } - - @Override public ConsumerRecord<byte[], byte[]> next() { - throw new IllegalStateException("this is an empty iterator"); + hasMore = nextRecord.offset() < endOffset; + } else { + hasMore = false; + nextRecord = null; } } } http://git-wip-us.apache.org/repos/asf/hive/blob/beaa8a8c/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java ---------------------------------------------------------------------- diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java index 98a5568..e048fb3 100644 --- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java +++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java @@ -38,7 +38,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Time; import org.junit.After; @@ -192,9 +191,13 @@ public class KafkaRecordIteratorTest { recordReader.close(); } - @Test(expected = TimeoutException.class) public void testPullingBeyondLimit() { - this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 0L, 101L, POLL_TIMEOUT_MS); - this.compareIterator(RECORDS, this.kafkaRecordIterator); + @Test(expected = IllegalStateException.class) public void testPullingBeyondLimit() { + //FYI In the Tx world Commits can introduce offset gaps therefore + //this (RECORD_NUMBER + 1) as beyond limit offset is only true if the topic has not Tx or any Control msg. + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 19383L, (long) RECORD_NUMBER + 1, POLL_TIMEOUT_MS); + this.compareIterator(RECORDS.stream() + .filter((consumerRecord) -> consumerRecord.offset() >= 19383L) + .collect(Collectors.toList()), this.kafkaRecordIterator); } @Test(expected = IllegalStateException.class) public void testPullingStartGreaterThanEnd() { @@ -202,13 +205,13 @@ public class KafkaRecordIteratorTest { this.compareIterator(RECORDS, this.kafkaRecordIterator); } - @Test(expected = TimeoutException.class) public void testPullingFromEmptyTopic() { + @Test(expected = IllegalStateException.class) public void testPullingFromEmptyTopic() { this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, new TopicPartition("noHere", 0), 0L, 100L, POLL_TIMEOUT_MS); this.compareIterator(RECORDS, this.kafkaRecordIterator); } - @Test(expected = TimeoutException.class) public void testPullingFromEmptyPartition() { + @Test(expected = IllegalStateException.class) public void testPullingFromEmptyPartition() { this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, new TopicPartition(TOPIC, 1), 0L, 100L, POLL_TIMEOUT_MS); this.compareIterator(RECORDS, this.kafkaRecordIterator); @@ -268,18 +271,19 @@ public class KafkaRecordIteratorTest { consumerProps.setProperty("enable.auto.commit", "false"); consumerProps.setProperty("auto.offset.reset", "none"); consumerProps.setProperty("bootstrap.servers", "127.0.0.1:9092"); - this.conf.set("kafka.bootstrap.servers", "127.0.0.1:9092"); + conf.set("kafka.bootstrap.servers", "127.0.0.1:9092"); consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName()); consumerProps.setProperty("request.timeout.ms", "3002"); consumerProps.setProperty("fetch.max.wait.ms", "3001"); consumerProps.setProperty("session.timeout.ms", "3001"); consumerProps.setProperty("metadata.max.age.ms", "100"); + consumerProps.setProperty("max.poll.records", String.valueOf(RECORD_NUMBER - 1)); this.consumer = new KafkaConsumer<>(consumerProps); } private static void sendData() { - LOG.info("Sending {} records", RECORD_NUMBER); + LOG.info("Sending [{}] records", RECORDS.size()); RECORDS.stream() .map(consumerRecord -> new ProducerRecord<>(consumerRecord.topic(), consumerRecord.partition(),