[
https://issues.apache.org/jira/browse/NIFI-1665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206528#comment-15206528
]
ASF GitHub Bot commented on NIFI-1665:
--------------------------------------
GitHub user olegz opened a pull request:
https://github.com/apache/nifi/pull/296
NIFI-1665 fixed GetKafka to reset consumer in case of timeout
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/olegz/nifi NIFI-1665
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/nifi/pull/296.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #296
----
commit 4840991d1feb9f10413adddcbd82bca563268999
Author: Oleg Zhurakousky <[email protected]>
Date: 2016-03-22T15:19:39Z
NIFI-1665 fixed GetKafka to reset consumer in case of timeout
----
> GetKafka continually throws NullPointerException if it ever fails to write
> out message
> --------------------------------------------------------------------------------------
>
> Key: NIFI-1665
> URL: https://issues.apache.org/jira/browse/NIFI-1665
> Project: Apache NiFi
> Issue Type: Bug
> Reporter: Mark Payne
> Assignee: Oleg Zhurakousky
>
> If an Exception is thrown in GetKafka's consumeFromKafka method, it enters
> the following block:
> {code}
> catch (final Exception e) {
> this.shutdownConsumer();
> getLogger().error("Failed to receive FlowFile from Kafka due to
> {}", new Object[]{e});
> if (flowFile != null) {
> session.remove(flowFile);
> }
> {code}
> This call to shutdownConsumer performs the following:
> {code}
> if (this.executor != null) {
> this.executor.shutdown();
> try {
> if (!this.executor.awaitTermination(30000,
> TimeUnit.MILLISECONDS)) {
> this.executor.shutdownNow();
> getLogger().warn("Executor did not stop in 30 sec.
> Terminated.");
> }
> this.executor = null;
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> }
> }
> {code}
> Now that this.executor is set to null, the onTrigger method will continually
> throw NullPointerException because it attempts to call executor.submit:
> {code}
> synchronized (this.consumerStreamsReady) {
> if (!this.consumerStreamsReady.get()) {
> Future<Void> f = this.executor.submit(new Callable<Void>() {
> ...
> {code}
> and also
> {code}
> if (this.consumerStreamsReady.get()) {
> Future<Void> consumptionFuture = this.executor.submit(new
> Callable<Void>() {
> @Override
> public Void call() throws Exception {
> ...
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)