[
https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16659403#comment-16659403
]
ASF GitHub Bot commented on NIFI-4914:
--------------------------------------
Github user pvillard31 commented on the issue:
https://github.com/apache/nifi/pull/2882
Hi @david-streamlio - sorry it took so long, I've been busy with a lot of
things...
I've not been able to complete all my tests successfully but I wanted to
give a status with my tests so far. All the code changes I did are available
here: 8ca2db6712b9fc843a3d2ee7917ecc6db2dd54f2 Most of the changes are just to
be consistent with the code base but some have been required to make things
work.
In addition to that, I have the following comments: if we leave the ack
timeout to 0sec, we get the following error
````
2018-10-22 08:23:18,998 WARN [Timer-Driven Process Thread-8]
o.a.n.controller.tasks.ConnectableTask Administratively Yielding
ConsumePulsar[id=1a70a923-0166-1000-e698-63a0160cc922] due to uncaught
Exception: java.lang.IllegalArgumentException: Ack timeout should be should be
greater than 1000 ms
java.lang.IllegalArgumentException: Ack timeout should be should be greater
than 1000 ms
at
org.apache.pulsar.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at
org.apache.pulsar.client.impl.ConsumerBuilderImpl.ackTimeout(ConsumerBuilderImpl.java:147)
at
org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor.getConsumerBulder(AbstractPulsarConsumerProcessor.java:361)
at
org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor.getConsumer(AbstractPulsarConsumerProcessor.java:338)
at
org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar.onTrigger(ConsumePulsar.java:46)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
````
Is it something that changed with the latest version of Pulsar?
In the consume processors, I'd add the topic name as attribute of flowfile
and possibly other attributes that could be useful for later routing/filtering.
When stopping the processors or stopping NiFi, some processors does not
stop cleanly and it requires a process kill for NiFi to shutdown. During my
last test, it was on ConsumePulsar but it could be on other processors as well.
> Implement record model processor for Pulsar, i.e. ConsumePulsarRecord,
> PublishPulsarRecord
> ------------------------------------------------------------------------------------------
>
> Key: NIFI-4914
> URL: https://issues.apache.org/jira/browse/NIFI-4914
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.6.0
> Reporter: David Kjerrumgaard
> Priority: Minor
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> Create record-based processors for Apache Pulsar
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)