[
https://issues.apache.org/jira/browse/NIFI-3739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991039#comment-15991039
]
ASF GitHub Bot commented on NIFI-3739:
--------------------------------------
Github user joewitt commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1695#discussion_r114150476
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
---
@@ -392,13 +394,27 @@ private void writeDemarcatedData(final ProcessSession
session, final List<Consum
bundleMap.put(topicPartition, tracker);
}
+ private void rollback(final TopicPartition topicPartition) {
+ final OffsetAndMetadata offsetAndMetadata =
kafkaConsumer.committed(topicPartition);
+ final long offset = offsetAndMetadata.offset();
+ kafkaConsumer.seek(topicPartition, offset);
+ }
+
private void writeRecordData(final ProcessSession session, final
List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition
topicPartition) {
FlowFile flowFile = session.create();
try {
final RecordSetWriter writer;
try {
writer = writerFactory.createWriter(logger, flowFile, new
ByteArrayInputStream(new byte[0]));
} catch (final Exception e) {
+ logger.error(
+ "Failed to obtain a Record Writer for serializing
Kafka messages. This generally happens because the "
+ + "Record Writer cannot obtain the appropriate
Schema, due to failure to connect to a remote Schema Registry "
+ + "or due to the Schema Access Strategy being
dependent upon FlowFile Attributes that are not available. "
+ + "Will roll back the Kafka session.", e);
+
+ rollback(topicPartition);
--- End diff --
actually since we've not yet committed our offsets this is probably client
side only...so probably fine
> Create Processors for publishing records to and consuming records from Kafka
> ----------------------------------------------------------------------------
>
> Key: NIFI-3739
> URL: https://issues.apache.org/jira/browse/NIFI-3739
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.2.0
>
>
> With the new record readers & writers that have been added in now, it would
> be good to allow records to be pushed to and pulled from kafka. Currently, we
> support demarcated data but sometimes we can't correctly demarcate data in a
> way that keeps the format valid (json is a good example). We should have
> processors that use the record readers and writers for this.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)