pvillard31 commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r551539908



##########
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws 
IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, 
possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+        final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new 
StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final 
MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new 
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final 
ProcessSession session){
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new 
ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? 
new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new 
ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, 
in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the 
internal queue, sending to the parse failure relationship", e);

Review comment:
       Not really, we just have the payload, QoS and topic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to