Pierre Villard created NIFI-7889:
------------------------------------

             Summary: ConsumeMQTT - use offer instead of add
                 Key: NIFI-7889
                 URL: https://issues.apache.org/jira/browse/NIFI-7889
             Project: Apache NiFi
          Issue Type: Improvement
          Components: Extensions
            Reporter: Pierre Villard


In ConsumeMQTT, we are filling the internal queue of the processor using this 
code:
{code:java}
        if (mqttQueue.size() >= maxQueueSize){
            throw new IllegalStateException("The subscriber queue is full, 
cannot receive another message until the processor is scheduled to run.");
        } else {
            mqttQueue.add(new MQTTQueueMessage(topic, message));
        }
{code}
Instead of throwing an exception when the internal queue is full, we could have 
a blocking call with {{offer}}() to give some time for the queue to be drained 
and then add the message to the queue. If the queue is still full, we'd throw 
the exception which would cause data loss in case the QoS is configured to 0.

Documentation should also be improved around the implications of such 
configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to