markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r976577008
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +636,131 @@ private void writeRecordData(final ProcessSession
session, final List<ConsumerRe
}
}
+ private RecordSetWriter writeRecord(final ProcessSession session, final
ConsumerRecord<byte[], byte[]> consumerRecord, final TopicPartition
topicPartition,
+ final Record record, final Map<String,
String> attributes) throws SchemaNotFoundException, IOException {
+ RecordSetWriter writer = null;
+ // Determine the bundle for this record.
+ final RecordSchema recordSchema = record.getSchema();
+ final BundleInformation bundleInfo = new
BundleInformation(topicPartition, recordSchema, attributes, separateByKey ?
consumerRecord.key() : null);
+
+ BundleTracker tracker = bundleMap.get(bundleInfo);
+ if (tracker == null) {
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ final OutputStream rawOut = session.write(flowFile);
+
+ final RecordSchema writeSchema;
+ try {
+ writeSchema =
writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+ } catch (final Exception e) {
+ logger.error("Failed to obtain Schema for FlowFile. Will
roll back the Kafka message offsets.", e);
+
+ rollback(topicPartition);
+ yield();
+
+ throw new ProcessException(e);
+ }
+
+ writer = writerFactory.createWriter(logger, writeSchema,
rawOut, flowFile);
+ writer.beginRecordSet();
+
+ tracker = new BundleTracker(consumerRecord, topicPartition,
keyEncoding, writer);
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(bundleInfo, tracker);
+ } else {
+ writer = tracker.recordWriter;
+ }
+
+ try {
+ writer.write(record);
+ } catch (final RuntimeException re) {
+ handleParseFailure(consumerRecord, session, re, "Failed to
write message from Kafka using the configured Record Writer. "
+ + "Will route message as its own FlowFile to the
'parse.failure' relationship");
+ return writer;
+ }
+
+ tracker.incrementRecordCount(1L, consumerRecord.offset(),
consumerRecord.leaderEpoch().orElse(null));
+ session.adjustCounter("Records Received", 1L, false);
+ return writer;
+ }
+
+ private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]>
consumerRecord, final Record record)
+ throws IOException, SchemaNotFoundException,
MalformedRecordException {
+ final Tuple<RecordField, Object> tupleKey =
toWrapperRecordKey(consumerRecord);
+ final Tuple<RecordField, Object> tupleValue =
toWrapperRecordValue(record);
+ final Tuple<RecordField, Object> tupleHeaders =
toWrapperRecordHeaders(consumerRecord);
+ final Tuple<RecordField, Object> tupleMetadata =
toWrapperRecordMetadata(consumerRecord);
+ final RecordSchema rootRecordSchema = new
SimpleRecordSchema(Arrays.asList(
+ tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(),
tupleMetadata.getKey()));
+
+ final Map<String, Object> recordValues = new HashMap<>();
+ recordValues.put(tupleKey.getKey().getFieldName(),
tupleKey.getValue());
+ recordValues.put(tupleValue.getKey().getFieldName(),
tupleValue.getValue());
+ recordValues.put(tupleHeaders.getKey().getFieldName(),
tupleHeaders.getValue());
+ recordValues.put(tupleMetadata.getKey().getFieldName(),
tupleMetadata.getValue());
+ return new MapRecord(rootRecordSchema, recordValues);
+ }
+
+ private Tuple<RecordField, Object> toWrapperRecordKey(final
ConsumerRecord<byte[], byte[]> consumerRecord)
+ throws IOException, SchemaNotFoundException,
MalformedRecordException {
+ final Tuple<RecordField, Object> tuple;
+ final byte[] key = consumerRecord.key() == null ? new byte[0] :
consumerRecord.key();
+ if (KafkaProcessorUtils.KEY_AS_RECORD.getValue().equals(keyFormat)) {
+ final Map<String, String> attributes =
getAttributes(consumerRecord);
+ try (final InputStream is = new ByteArrayInputStream(key)) {
+ try (final RecordReader reader =
keyReaderFactory.createRecordReader(attributes, is, key.length, logger)) {
+ final Record record = reader.nextRecord();
+ final RecordField recordField = new RecordField("key",
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+ tuple = new Tuple<>(recordField, record);
+ }
+ }
+ } else if
(KafkaProcessorUtils.KEY_AS_STRING.getValue().equals(keyFormat)) {
+ final RecordField recordField = new RecordField("key",
RecordFieldType.STRING.getDataType());
+ tuple = new Tuple<>(recordField, new String(key,
StandardCharsets.UTF_8));
Review Comment:
If the original key was null, we should not be using `new String(key,
StandardCharsets.UTF_8)` because that generates an empty string. Instead, we
should just use `null`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]