Repository: kafka Updated Branches: refs/heads/trunk 13c3e049f -> 1ac2640f8
KAFKA-2683: ensure wakeup exceptions raised to user Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ewen Cheslack-Postava, Guozhang Wang Closes #366 from hachikuji/KAFKA-2683 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ac2640f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ac2640f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ac2640f Branch: refs/heads/trunk Commit: 1ac2640f8095262f423c770060b737f81652e211 Parents: 13c3e04 Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Oct 27 17:39:19 2015 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Oct 27 17:39:19 2015 -0700 ---------------------------------------------------------------------- .../consumer/ConsumerWakeupException.java | 20 --------------- .../kafka/clients/consumer/KafkaConsumer.java | 26 ++++++++++---------- .../kafka/clients/consumer/MockConsumer.java | 3 ++- .../consumer/internals/AbstractCoordinator.java | 26 +++++++++++--------- .../consumer/internals/ConsumerCoordinator.java | 10 +++++--- .../internals/ConsumerNetworkClient.java | 10 ++++---- .../kafka/common/errors/WakeupException.java | 26 ++++++++++++++++++++ .../internals/ConsumerNetworkClientTest.java | 4 +-- .../kafka/copycat/runtime/WorkerSinkTask.java | 3 ++- .../runtime/distributed/DistributedHerder.java | 6 ++--- .../runtime/distributed/WorkerGroupMember.java | 2 +- .../kafka/copycat/util/KafkaBasedLog.java | 10 ++++---- 12 files changed, 81 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java deleted file mode 100644 index 35f1ec9..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.consumer; - -import org.apache.kafka.common.KafkaException; - -public class ConsumerWakeupException extends KafkaException { - private static final long serialVersionUID = 1L; - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 06a9239..7aef8a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -313,9 +313,9 @@ import java.util.regex.Pattern; * * <p> * The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to - * interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be thrown from the thread - * blocking on the operation. This can be used to shutdown the consumer from another thread. The following - * snippet shows the typical pattern: + * interrupt an active operation. In this case, a {@link org.apache.kafka.common.errors.WakeupException} will be + * thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread. + * The following snippet shows the typical pattern: * * <pre> * public class KafkaConsumerRunner implements Runnable { @@ -329,7 +329,7 @@ import java.util.regex.Pattern; * ConsumerRecords records = consumer.poll(10000); * // Handle new records * } - * } catch (ConsumerWakeupException e) { + * } catch (WakeupException e) { * // Ignore exception if closing * if (!closed.get()) throw e; * } finally { @@ -778,7 +778,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * offset reset policy has been configured. * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE - * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called */ @Override public ConsumerRecords<K, V> poll(long timeout) { @@ -818,7 +818,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @return The fetched records (may be empty) * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE - * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called */ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) @@ -858,7 +858,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is * encountered (in which case it is thrown to the caller). * - * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called */ @Override public void commitSync() { @@ -881,7 +881,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * encountered (in which case it is thrown to the caller). * * @param offsets A map of offsets by partition with associated metadata - * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called */ @Override public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) { @@ -1006,7 +1006,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @return The offset * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is * available. - * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called */ public long position(TopicPartition partition) { acquire(); @@ -1033,7 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param partition The partition to check * @return The last committed offset and metadata or null if there was no prior commit - * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called */ @Override public OffsetAndMetadata committed(TopicPartition partition) { @@ -1071,7 +1071,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param topic The topic to get partition metadata for * @return The list of partitions - * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called */ @Override public List<PartitionInfo> partitionsFor(String topic) { @@ -1094,7 +1094,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * server. * * @return The map of topics and its partitions - * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called */ @Override public Map<String, List<PartitionInfo>> listTopics() { @@ -1158,7 +1158,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll. - * The thread which is blocking in an operation will throw {@link ConsumerWakeupException}. + * The thread which is blocking in an operation will throw {@link WakeupException}. */ @Override public void wakeup() { http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index ed1c1e2..25c0c2c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; import java.util.ArrayList; import java.util.Collection; @@ -135,7 +136,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { if (wakeup.get()) { wakeup.set(false); - throw new ConsumerWakeupException(); + throw new WakeupException(); } if (exception != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 549c8de..8d5ee16 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.requests.GroupMetadataRequest; import org.apache.kafka.common.requests.GroupMetadataResponse; import org.apache.kafka.common.requests.HeartbeatRequest; @@ -181,8 +180,12 @@ public abstract class AbstractCoordinator { RequestFuture<Void> future = sendGroupMetadataRequest(); client.poll(future, requestTimeoutMs); - if (future.failed()) - client.awaitMetadataUpdate(); + if (future.failed()) { + if (future.isRetriable()) + client.awaitMetadataUpdate(); + else + throw future.exception(); + } } } @@ -417,12 +420,8 @@ public abstract class AbstractCoordinator { RequestFuture<ByteBuffer> future) { short errorCode = syncResponse.errorCode(); if (errorCode == Errors.NONE.code()) { - try { - future.complete(syncResponse.memberAssignment()); - sensors.syncLatency.record(response.requestLatencyMs()); - } catch (SchemaException e) { - future.raise(e); - } + future.complete(syncResponse.memberAssignment()); + sensors.syncLatency.record(response.requestLatencyMs()); } else { AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.forCode(errorCode)); @@ -588,8 +587,13 @@ public abstract class AbstractCoordinator { return; } - R response = parse(clientResponse); - handle(response, future); + try { + R response = parse(clientResponse); + handle(response, future); + } catch (RuntimeException e) { + if (!future.isDone()) + future.raise(e); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 20d1564..641939a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -15,7 +15,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerWakeupException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment; @@ -180,6 +180,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl try { Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsAssigned(assigned); + } catch (WakeupException e) { + throw e; } catch (Exception e) { log.error("User provided listener " + listener.getClass().getName() + " failed on partition assignment: ", e); @@ -234,6 +236,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl try { Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsRevoked(revoked); + } catch (WakeupException e) { + throw e; } catch (Exception e) { log.error("User provided listener " + listener.getClass().getName() + " failed on partition revocation: ", e); @@ -302,7 +306,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl try { maybeAutoCommitOffsetsSync(); return; - } catch (ConsumerWakeupException e) { + } catch (WakeupException e) { // ignore wakeups while closing to ensure we have a chance to commit continue; } @@ -368,7 +372,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl if (autoCommitEnabled) { try { commitOffsetsSync(subscriptions.allConsumed()); - } catch (ConsumerWakeupException e) { + } catch (WakeupException e) { // rethrow wakeups since they are triggered by the user throw e; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index e3a2514..4757fc4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -17,7 +17,7 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; -import org.apache.kafka.clients.consumer.ConsumerWakeupException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.AbstractRequest; @@ -147,7 +147,7 @@ public class ConsumerNetworkClient implements Closeable { /** * Block indefinitely until the given request future has finished. * @param future The request future to await. - * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread + * @throws WakeupException if {@link #wakeup()} is called from another thread */ public void poll(RequestFuture<?> future) { while (!future.isDone()) @@ -159,7 +159,7 @@ public class ConsumerNetworkClient implements Closeable { * @param future The request future to wait for * @param timeout The maximum duration (in ms) to wait for the request * @return true if the future is done, false otherwise - * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread + * @throws WakeupException if {@link #wakeup()} is called from another thread */ public boolean poll(RequestFuture<?> future, long timeout) { long now = time.milliseconds(); @@ -175,7 +175,7 @@ public class ConsumerNetworkClient implements Closeable { * Poll for any network IO. All send requests will either be transmitted on the network * or failed when this call completes. * @param timeout The maximum time to wait for an IO event. - * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread + * @throws WakeupException if {@link #wakeup()} is called from another thread */ public void poll(long timeout) { poll(timeout, time.milliseconds()); @@ -298,7 +298,7 @@ public class ConsumerNetworkClient implements Closeable { if (wakeup.get()) { failUnsentRequests(); wakeup.set(false); - throw new ConsumerWakeupException(); + throw new WakeupException(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java new file mode 100644 index 0000000..a2e718d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +/** + * Exception used to indicate preemption of a blocking operation by an external thread. + * For example, {@link org.apache.kafka.clients.consumer.KafkaConsumer#wakeup} + * can be used to break out of an active {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}, + * which would raise an instance of this exception. + */ +public class WakeupException extends KafkaException { + private static final long serialVersionUID = 1L; + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 6a42058..1692010 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -15,7 +15,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.consumer.ConsumerWakeupException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.ApiKeys; @@ -96,7 +96,7 @@ public class ConsumerNetworkClientTest { try { consumerClient.poll(0); fail(); - } catch (ConsumerWakeupException e) { + } catch (WakeupException e) { } client.respond(heartbeatResponse(Errors.NONE.code())); http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index edb415a..3c5cd13 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -19,6 +19,7 @@ package org.apache.kafka.copycat.runtime; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.utils.Utils; @@ -112,7 +113,7 @@ class WorkerSinkTask implements WorkerTask { ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs); log.trace("{} polling returned {} messages", id, msgs.count()); deliverMessages(msgs); - } catch (ConsumerWakeupException we) { + } catch (WakeupException we) { log.trace("{} consumer woken up", id); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java index bf3229d..17bf7b7 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java @@ -17,7 +17,7 @@ package org.apache.kafka.copycat.runtime.distributed; -import org.apache.kafka.clients.consumer.ConsumerWakeupException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.connector.ConnectorContext; @@ -159,7 +159,7 @@ public class DistributedHerder implements Herder, Runnable { member.ensureActive(); // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin if (!handleRebalanceCompleted()) return; - } catch (ConsumerWakeupException e) { + } catch (WakeupException e) { // May be due to a request from another thread, or might be stopping. If the latter, we need to check the // flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests // unless we're in the group. @@ -217,7 +217,7 @@ public class DistributedHerder implements Herder, Runnable { member.poll(Long.MAX_VALUE); // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin if (!handleRebalanceCompleted()) return; - } catch (ConsumerWakeupException e) { // FIXME should not be ConsumerWakeupException + } catch (WakeupException e) { // FIXME should not be WakeupException // Ignore. Just indicates we need to check the exit flag, for requested actions, etc. } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java index f8cabaa..03960cf 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java @@ -147,7 +147,7 @@ public class WorkerGroupMember { } /** - * Interrupt any running poll() calls, causing a ConsumerWakeupException to be thrown in the thread invoking that method. + * Interrupt any running poll() calls, causing a WakeupException to be thrown in the thread invoking that method. */ public void wakeup() { this.client.wakeup(); http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java index 5e860d9..f5e72d3 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java @@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.ConsumerWakeupException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -232,7 +232,7 @@ public class KafkaBasedLog<K, V> { ConsumerRecords<K, V> records = consumer.poll(timeoutMs); for (ConsumerRecord<K, V> record : records) consumedCallback.onCompletion(null, record); - } catch (ConsumerWakeupException e) { + } catch (WakeupException e) { // Expected on get() or stop(). The calling code should handle this throw e; } catch (KafkaException e) { @@ -257,7 +257,7 @@ public class KafkaBasedLog<K, V> { try { poll(0); } finally { - // If there is an exception, even a possibly expected one like ConsumerWakeupException, we need to make sure + // If there is an exception, even a possibly expected one like WakeupException, we need to make sure // the consumers position is reset or it'll get into an inconsistent state. for (TopicPartition tp : assignment) { long startOffset = offsets.get(tp); @@ -300,7 +300,7 @@ public class KafkaBasedLog<K, V> { if (numCallbacks > 0) { try { readToLogEnd(); - } catch (ConsumerWakeupException e) { + } catch (WakeupException e) { // Either received another get() call and need to retry reading to end of log or stop() was // called. Both are handled by restarting this loop. continue; @@ -318,7 +318,7 @@ public class KafkaBasedLog<K, V> { try { poll(Integer.MAX_VALUE); - } catch (ConsumerWakeupException e) { + } catch (WakeupException e) { // See previous comment, both possible causes of this wakeup are handled by starting this loop again continue; }