Repository: beam Updated Branches: refs/heads/master 85a99e294 -> b8f8d18ae
Add timeout to initialization of partition in KafkaIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/526037b6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/526037b6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/526037b6 Branch: refs/heads/master Commit: 526037b6786315b9f9fdca6edb636baeb6f83e3f Parents: 85a99e2 Author: Raghu Angadi <[email protected]> Authored: Mon Jul 3 23:54:10 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jul 6 11:58:41 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 81 +++++++++++++++----- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 30 ++++++++ 2 files changed, 92 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/526037b6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index e520367..026313a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -49,9 +49,11 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; @@ -1061,8 +1063,32 @@ public class KafkaIO { curBatch = Iterators.cycle(nonEmpty); } + private void setupInitialOffset(PartitionState pState) { + Read<K, V> spec = source.spec; + + if (pState.nextOffset != UNINITIALIZED_OFFSET) { + consumer.seek(pState.topicPartition, pState.nextOffset); + } else { + // nextOffset is unininitialized here, meaning start reading from latest record as of now + // ('latest' is the default, and is configurable) or 'look up offset by startReadTime. + // Remember the current position without waiting until the first record is read. This + // ensures checkpoint is accurate even if the reader is closed before reading any records. + Instant startReadTime = spec.getStartReadTime(); + if (startReadTime != null) { + pState.nextOffset = + consumerSpEL.offsetForTime(consumer, pState.topicPartition, spec.getStartReadTime()); + consumer.seek(pState.topicPartition, pState.nextOffset); + } else { + pState.nextOffset = consumer.position(pState.topicPartition); + } + } + } + @Override public boolean start() throws IOException { + final int defaultPartitionInitTimeout = 60 * 1000; + final int kafkaRequestTimeoutMultiple = 2; + Read<K, V> spec = source.spec; consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig()); consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions()); @@ -1077,25 +1103,38 @@ public class KafkaIO { keyDeserializerInstance.configure(spec.getConsumerConfig(), true); valueDeserializerInstance.configure(spec.getConsumerConfig(), false); - for (PartitionState p : partitionStates) { - if (p.nextOffset != UNINITIALIZED_OFFSET) { - consumer.seek(p.topicPartition, p.nextOffset); - } else { - // nextOffset is unininitialized here, meaning start reading from latest record as of now - // ('latest' is the default, and is configurable) or 'look up offset by startReadTime. - // Remember the current position without waiting until the first record is read. This - // ensures checkpoint is accurate even if the reader is closed before reading any records. - Instant startReadTime = spec.getStartReadTime(); - if (startReadTime != null) { - p.nextOffset = - consumerSpEL.offsetForTime(consumer, p.topicPartition, spec.getStartReadTime()); - consumer.seek(p.topicPartition, p.nextOffset); - } else { - p.nextOffset = consumer.position(p.topicPartition); + // Seek to start offset for each partition. This is the first interaction with the server. + // Unfortunately it can block forever in case of network issues like incorrect ACLs. + // Initialize partition in a separate thread and cancel it if takes longer than a minute. + for (final PartitionState pState : partitionStates) { + Future<?> future = consumerPollThread.submit(new Runnable() { + public void run() { + setupInitialOffset(pState); } - } + }); - LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset); + try { + // Timeout : 1 minute OR 2 * Kafka consumer request timeout if it is set. + Integer reqTimeout = (Integer) source.spec.getConsumerConfig().get( + ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + future.get(reqTimeout != null ? kafkaRequestTimeoutMultiple * reqTimeout + : defaultPartitionInitTimeout, + TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + consumer.wakeup(); // This unblocks consumer stuck on network I/O. + // Likely reason : Kafka servers are configured to advertise internal ips, but + // those ips are not accessible from workers outside. + String msg = String.format( + "%s: Timeout while initializing partition '%s'. " + + "Kafka client may not be able to connect to servers.", + this, pState.topicPartition); + LOG.error("{}", msg); + throw new IOException(msg); + } catch (Exception e) { + throw new IOException(e); + } + LOG.info("{}: reading from {} starting at offset {}", + name, pState.topicPartition, pState.nextOffset); } // Start consumer read loop. @@ -1329,8 +1368,12 @@ public class KafkaIO { // might block to enqueue right after availableRecordsQueue.poll() below. while (!isShutdown) { - consumer.wakeup(); - offsetConsumer.wakeup(); + if (consumer != null) { + consumer.wakeup(); + } + if (offsetConsumer != null) { + offsetConsumer.wakeup(); + } availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread. try { isShutdown = consumerPollThread.awaitTermination(10, TimeUnit.SECONDS) http://git-wip-us.apache.org/repos/asf/beam/blob/526037b6/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index b69bc83..482f5a2 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -83,6 +83,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -364,6 +365,35 @@ public class KafkaIOTest { } @Test + public void testUnreachableKafkaBrokers() { + // Expect an exception when the Kafka brokers are not reachable on the workers. + // We specify partitions explicitly so that splitting does not involve server interaction. + // Set request timeout to 10ms so that test does not take long. + + thrown.expect(Exception.class); + thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'"); + + int numElements = 1000; + PCollection<Long> input = p + .apply(KafkaIO.<Integer, Long>read() + .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip. + .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0))) + .withKeyDeserializer(IntegerDeserializer.class) + .withValueDeserializer(LongDeserializer.class) + .updateConsumerProperties(ImmutableMap.<String, Object>of( + ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10, + ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5, + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8, + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8)) + .withMaxNumRecords(10) + .withoutMetadata()) + .apply(Values.<Long>create()); + + addCountingAsserts(input, numElements); + p.run(); + } + + @Test public void testUnboundedSourceWithSingleTopic() { // same as testUnboundedSource, but with single topic
