[
https://issues.apache.org/jira/browse/STORM-380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073703#comment-14073703
]
ASF GitHub Bot commented on STORM-380:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/170#discussion_r15375343
--- Diff:
external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java ---
@@ -116,6 +116,9 @@ private int getLeaderFor(long partition) {
byte[] hostPortData =
_curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
Map<Object, Object> value = (Map<Object, Object>)
JSONValue.parse(new String(hostPortData, "UTF-8"));
Integer leader = ((Number) value.get("leader")).intValue();
+ if (leader == -1) {
+ throw new RuntimeException("No leader found for partition
" + partition);
+ }
--- End diff --
It would be nice to add a catch for this so that we do not throw a
doubly-wrapped RuntimeException.
...
```Java
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
```
...
> Kafka spout: throw RuntimeException if a leader cannot be found for a
> partition
> -------------------------------------------------------------------------------
>
> Key: STORM-380
> URL: https://issues.apache.org/jira/browse/STORM-380
> Project: Apache Storm (Incubating)
> Issue Type: Improvement
> Reporter: Stephen Elliott
> Priority: Minor
>
> If one of the partitions of the Kafka queue has no leader available,
> DynamicBrokersReader.getLeaderFor(partition) returns -1. This is logged in
> DynamicBrokersReader.getBrokerInfo (as there's no ZK node for a broker id -1)
> but there is no further action.
> In this situation the spout will carry on emitting only for those partitions
> that are available.
> My assumption is that if some partitions for a queue are unavailable, there's
> a problem that needs to be addressed before processing continues. Happy to
> discuss further.
--
This message was sent by Atlassian JIRA
(v6.2#6252)