pvillard31 commented on code in PR #9530: URL: https://github.com/apache/nifi/pull/9530#discussion_r1946412992
########## nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/RecordStreamPubSubMessageConverter.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.consume; + +import com.google.pubsub.v1.ReceivedMessage; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub; +import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE; + +public class RecordStreamPubSubMessageConverter implements PubSubMessageConverter { + private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of()); + + private final RecordReaderFactory readerFactory; + private final RecordSetWriterFactory writerFactory; + private final ComponentLog logger; + + public RecordStreamPubSubMessageConverter( + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, + final ComponentLog logger) { + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.logger = logger; + } + + @Override + public void toFlowFiles(final ProcessSession session, final List<ReceivedMessage> messages, final List<String> ackIds, final String subscriptionName) { + try { + final Map<RecordGroupCriteria, RecordGroup> recordGroups = new HashMap<>(); + final Map<String, String> attributes = new HashMap<>(); + + for (ReceivedMessage message : messages) { + if (message.hasMessage()) { + byte[] payload = message.getMessage().getData().toByteArray(); + try (final InputStream in = new ByteArrayInputStream(payload); + final RecordReader valueRecordReader = readerFactory.createRecordReader(attributes, in, payload.length, logger)) { + + while (true) { + final Record record = valueRecordReader.nextRecord(); + + if (record == null) { + break; + } + + final RecordSchema recordSchema = record == null ? EMPTY_SCHEMA : record.getSchema(); + final RecordSchema writeSchema = writerFactory.getSchema(attributes, recordSchema); + + // Get/Register the Record Group that is associated with the schema for this + // message + final RecordGroupCriteria groupCriteria = new RecordGroupCriteria(writeSchema); + RecordGroup recordGroup = recordGroups.get(groupCriteria); + if (recordGroup == null) { + FlowFile flowFile = session.create(); + final OutputStream out = session.write(flowFile); + final RecordSetWriter writer; + try { + writer = writerFactory.createWriter(logger, writeSchema, out, attributes); + writer.beginRecordSet(); + } catch (final Exception e) { + out.close(); + throw e; + } + + recordGroup = new RecordGroup(flowFile, writer); + recordGroups.put(groupCriteria, recordGroup); + } + + // Create the Record object and write it to the Record Writer. + if (record != null) { + recordGroup.writer().write(record); + } + } + } catch (final MalformedRecordException | IOException | SchemaNotFoundException e) { + // Failed to parse the record. Transfer to a 'parse.failure' relationship + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, message.getMessage().getAttributesMap()); + flowFile = session.putAttribute(flowFile, ACK_ID_ATTRIBUTE, message.getAckId()); + flowFile = session.putAttribute(flowFile, SERIALIZED_SIZE_ATTRIBUTE, String.valueOf(message.getSerializedSize())); + flowFile = session.putAttribute(flowFile, MESSAGE_ID_ATTRIBUTE, message.getMessage().getMessageId()); + flowFile = session.putAttribute(flowFile, MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(message.getMessage().getAttributesCount())); + flowFile = session.putAttribute(flowFile, MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(message.getMessage().getPublishTime().getSeconds())); + flowFile = session.putAttribute(flowFile, SUBSCRIPTION_NAME_ATTRIBUTE, subscriptionName); + flowFile = session.write(flowFile, out -> out.write(payload)); + session.transfer(flowFile, ConsumeGCPubSub.REL_PARSE_FAILURE); + session.adjustCounter("Records Received from " + subscriptionName, 1, false); + + // Track the ack ID for the message + ackIds.add(message.getAckId()); Review Comment: In my mind yes because we still consumed the message (sent to parse failure relationship). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
