turcsanyip commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r561332768



##########
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws 
IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, 
possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+        final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new 
StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());

Review comment:
       There is no separator character between the broker and the topic prefix 
(eg.: `tcp://myhost:1883mytopic`).
   `'/'` cloud be added before topic prefix.
   It could be changed in the existing `transferQueue()` method too. 

##########
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws 
IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, 
possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+        final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {

Review comment:
       Emptying the queue seems to me a bit non-deterministic behaviour because 
the queue is being written at the same time by the receiver thread.
   Would not it be useful to define a max. size that may be fetched in one go? 
(a magic number or a processor property) 

##########
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws 
IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, 
possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+        final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new 
StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final 
MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new 
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final 
ProcessSession session){
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new 
ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? 
new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new 
ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, 
in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the 
internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = 
record.getSchema();
+                                final OutputStream rawOut = 
session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = 
writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new 
ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new 
RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new 
RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new 
RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new 
RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new 
SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for 
FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, 
writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, 
mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, 
mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, 
mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, 
mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using 
the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter(COUNTER_RECORDS_RECEIVED, 
1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | 
SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the 
parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the 
parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put(RECORD_COUNT_KEY, 
String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            context.yield();
+
+            // we try to add the messages back into the internal queue
+            int numberOfMessages = 0;
+            for(MQTTQueueMessage done : doneList) {
+                try {
+                    mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    numberOfMessages++;
+                    if(getLogger().isDebugEnabled()) {
+                        logger.debug("Could not add message back into the 
internal queue, this could lead to data loss", ex);
+                    }
+                }
+            }
+            if(numberOfMessages > 0) {
+                logger.error("Could not add {} message(s) back into the 
internal queue, this could mean data loss", new Object[] {numberOfMessages});
+            }
+
+            throw new ProcessException("Could not process data received from 
the MQTT broker(s): " + broker, e);
+        } finally {
+            closeWriter(writer);
+        }
+
+        if(recordCount.get() == 0) {
+            session.remove(flowFile);
+            return;
+        }
+
+        session.putAllAttributes(flowFile, attributes);
+        session.getProvenanceReporter().receive(flowFile, broker);

Review comment:
       Topic info could be added to the Transit Uri as in the demarcator method.

##########
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws 
IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, 
possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+        final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new 
StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final 
MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new 
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final 
ProcessSession session){
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new 
ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? 
new byte[0] : mqttMessage.getPayload();

Review comment:
       `null` is not checked in the other two `transferQueue*` methods. Should 
it be checked there too or is it unnecessary here?

##########
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws 
IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, 
possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+        final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new 
StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final 
MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new 
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final 
ProcessSession session){
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new 
ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();

Review comment:
       Is there a specific reason why `take()` is used here, while `poll()` in 
the other two `transferQueue*` methods?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to