pvillard31 commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r551542640
##########
File path:
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws
IOException {
if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
logger.warn(new StringBuilder("FlowFile ")
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
- .append(" for Mqtt message ")
+ .append(" for MQTT message ")
.append(mqttMessage)
.append(" had already been removed from queue,
possible duplication of flow files")
.toString());
}
}
}
+ private void transferQueueDemarcator(final ProcessContext context, final
ProcessSession session){
+ final byte[] demarcator =
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+ FlowFile messageFlowfile = session.create();
+ session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+ messageFlowfile = session.append(messageFlowfile, out -> {
+ while (!mqttQueue.isEmpty()) {
+ final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+ out.write(mqttMessage.getPayload());
+ out.write(demarcator);
+ session.adjustCounter("Records Received", 1L, false);
+ }
+ });
+
+ session.getProvenanceReporter().receive(messageFlowfile, new
StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+ session.transfer(messageFlowfile, REL_MESSAGE);
+ session.commit();
+ }
+
+ private void transferFailure(final ProcessSession session, final
MQTTQueueMessage mqttMessage) {
+ FlowFile messageFlowfile = session.create();
+
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+ attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+ attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+ attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY,
String.valueOf(mqttMessage.isDuplicate()));
+ attrs.put(IS_RETAINED_ATTRIBUTE_KEY,
String.valueOf(mqttMessage.isRetained()));
+
+ messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+ messageFlowfile = session.write(messageFlowfile, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ out.write(mqttMessage.getPayload());
+ }
+ });
+
+ String transitUri = new
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+ session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+ session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+ session.adjustCounter("Parse Failures", 1, false);
+ }
+
+ private void transferQueueRecord(final ProcessContext context, final
ProcessSession session){
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ FlowFile flowFile = session.create();
+ session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+ final Map<String, String> attributes = new HashMap<>();
+ final AtomicInteger recordCount = new AtomicInteger();
+
+ final List<MQTTQueueMessage> doneList = new
ArrayList<MQTTQueueMessage>();
+
+ RecordSetWriter writer = null;
+ boolean isWriterInitialized = false;
+
+ try {
+ while (!mqttQueue.isEmpty()) {
+ final MQTTQueueMessage mqttMessage = mqttQueue.take();
+ final byte[] recordBytes = mqttMessage.getPayload() == null ?
new byte[0] : mqttMessage.getPayload();
+
+ try (final InputStream in = new
ByteArrayInputStream(recordBytes)) {
+ final RecordReader reader;
+
+ try {
+ reader = readerFactory.createRecordReader(attributes,
in, recordBytes.length, logger);
+ } catch (final Exception e) {
+ logger.error("Failed to parse the message from the
internal queue, sending to the parse failure relationship", e);
+ transferFailure(session, mqttMessage);
+ continue;
+ }
+
+ try {
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+
+ if(!isWriterInitialized) {
+ final RecordSchema recordSchema =
record.getSchema();
+ final OutputStream rawOut =
session.write(flowFile);
+
+ RecordSchema writeSchema;
+ try {
+ writeSchema =
writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+ final List<RecordField> fields = new
ArrayList<>();
+ fields.addAll(writeSchema.getFields());
+
+ fields.add(new
RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+ fields.add(new
RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+ fields.add(new
RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+ fields.add(new
RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+ writeSchema = new
SimpleRecordSchema(fields);
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to obtain Schema for
FlowFile, sending to the parse failure relationship", e);
+ transferFailure(session, mqttMessage);
+ continue;
+ }
+
+ writer = writerFactory.createWriter(logger,
writeSchema, rawOut, flowFile);
+ writer.beginRecordSet();
+ }
+
+ try {
+
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+ record.setValue(TOPIC_FIELD_KEY,
mqttMessage.getTopic());
+ record.setValue(QOS_FIELD_KEY,
mqttMessage.getQos());
+ record.setValue(IS_RETAINED_FIELD_KEY,
mqttMessage.isRetained());
+ record.setValue(IS_DUPLICATE_FIELD_KEY,
mqttMessage.isDuplicate());
+ }
+ writer.write(record);
+ isWriterInitialized = true;
+ doneList.add(mqttMessage);
+ } catch (final RuntimeException re) {
+ logger.error("Failed to write message using
the configured Record Writer, sending to the parse failure relationship", re);
+ transferFailure(session, mqttMessage);
+ continue;
+ }
+
+ session.adjustCounter("Records Received", 1L,
false);
+ }
+ } catch (final IOException | MalformedRecordException |
SchemaValidationException e) {
+ logger.error("Failed to write message, sending to the
parse failure relationship", e);
+ transferFailure(session, mqttMessage);
+ continue;
+ }
+ } catch (Exception e) {
+ logger.error("Failed to write message, sending to the
parse failure relationship", e);
+ transferFailure(session, mqttMessage);
+ continue;
+ }
+ }
+
+ if(writer != null) {
+ final WriteResult writeResult = writer.finishRecordSet();
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ recordCount.set(writeResult.getRecordCount());
+ }
+
+ } catch (final Exception e) {
+ context.yield();
+
+ // we try to add the messages back into the internal queue
+ for(MQTTQueueMessage done : doneList) {
+ try {
+ mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+ } catch (InterruptedException ex) {
+ logger.error("Could not add message back into the internal
queue, this could lead to data loss", ex);
Review comment:
The only option I see is to log the payload but this would raise
security concerns IMO. I can change the loop to have the log message only once
and say how many messages we could not re-insert in the queue.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]