[
https://issues.apache.org/jira/browse/FLINK-3295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15121161#comment-15121161
]
ASF GitHub Bot commented on FLINK-3295:
---------------------------------------
GitHub user rmetzger opened a pull request:
https://github.com/apache/flink/pull/1555
[FLINK-3295][Kafka 0.8] Improve ZK error behavior
This PR changes the behavior of the Kafka 0.8 source if there is an error
while getting the initial offsets from ZK.
Instead of failing, the exception is only logged (with the ZK connect
string) and the offsets are determined from kafka (depending on
`auto.offset.reset`).
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rmetzger/flink flink3295
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1555.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1555
----
commit fc59b453f4e4b47ca65d17db3d13d30940b93f1a
Author: Robert Metzger <[email protected]>
Date: 2016-01-28T09:53:13Z
[FLINK-3295][Kafka 0.8] Improve ZK error behavior
----
> Flink Kafka consumer fails with NoNodeException if zkConnect string contains
> chroot-path
> ----------------------------------------------------------------------------------------
>
> Key: FLINK-3295
> URL: https://issues.apache.org/jira/browse/FLINK-3295
> Project: Flink
> Issue Type: Bug
> Reporter: Robert Metzger
> Assignee: Robert Metzger
>
> Zookeeper allows since V 3.2 to pass a base path (similar to chroot) with the
> connection string. Flink's {{ZookeeperOffsetHandler}} doesn't show a very
> helpful error message when the base path hasn't been created by the user.
> This is the exception:
> {code}
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for /consumers
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
> at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
> at org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java:232)
> at
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
> at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
> at
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
> at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
> at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)
> at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:93)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.open(FlinkKafkaConsumer08.java:283)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> But the user passed a connection string like: {{host:port/foo/bar}}.
> See also: http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html
> (section on "ZooKeeper Sessions").
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)