[
https://issues.apache.org/jira/browse/NIFI-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613852#comment-16613852
]
ASF GitHub Bot commented on NIFI-5592:
--------------------------------------
GitHub user markap14 opened a pull request:
https://github.com/apache/nifi/pull/3001
NIFI-5592: If an Exception is thrown by RecordReader.read() from Cons…
…umeKafkaRecord, route Record to parse.failure relationship
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
- [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number
you are trying to resolve? Pay particular attention to the hyphen "-" character.
- [ ] Has your PR been rebased against the latest commit within the target
branch (typically master)?
- [ ] Is your initial contribution a single, squashed commit?
### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies
licensed in a way that is compatible for inclusion under [ASF
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file, including the main
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to
.name (programmatic access) for each of the new properties?
### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in
which it is rendered?
### Note:
Please ensure that once the PR is submitted, you check travis-ci for build
issues and submit an update to your PR as soon as possible.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/markap14/nifi NIFI-5592
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/nifi/pull/3001.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3001
----
commit 3c751e9556e400cd55e6b28bee8d73e9ff97db2b
Author: Mark Payne <markap14@...>
Date: 2018-09-13T17:56:13Z
NIFI-5592: If an Exception is thrown by RecordReader.read() from
ConsumeKafkaRecord, route Record to parse.failure relationship
----
> ConsumeKafkaRecord* processors can stop pulling data if the data doesn't
> match the configured schema
> ----------------------------------------------------------------------------------------------------
>
> Key: NIFI-5592
> URL: https://issues.apache.org/jira/browse/NIFI-5592
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Reporter: Mark Payne
> Assignee: Mark Payne
> Priority: Major
>
> If the data in the kafka topic does not adhere to the configured schema, the
> processor should route the data to 'parse.failure' but in some conditions, we
> may encounter the following SchemaValidationException:
> {code}
> 2018-09-13 07:37:54,196 ERROR [Timer-Driven Process Thread-1]
> o.a.n.p.k.pubsub.ConsumeKafkaRecord_1_0
> ConsumeKafkaRecord_1_0[id=c258fa20-0165-1000-ffff-ffffb401d2c7] Exception
> while processing data from kafka so will close the lease
> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@3fa54755
> due to org.apache.nifi.processor.exception.ProcessException:
> org.apache.nifi.serialization.SchemaValidationException: Field designation
> cannot be null: org.apache.nifi.processor.exception.ProcessException:
> org.apache.nifi.serialization.SchemaValidationException: Field designation
> cannot be null
> org.apache.nifi.processor.exception.ProcessException:
> org.apache.nifi.serialization.SchemaValidationException: Field designation
> cannot be null
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:587)
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$2(ConsumerLease.java:330)
> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
> at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:317)
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:178)
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.onTrigger(ConsumeKafkaRecord_1_0.java:378)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.nifi.serialization.SchemaValidationException: Field
> designation cannot be null
> at
> org.apache.nifi.serialization.record.MapRecord.checkTypes(MapRecord.java:81)
> at org.apache.nifi.serialization.record.MapRecord.<init>(MapRecord.java:52)
> at org.apache.nifi.csv.CSVRecordReader.nextRecord(CSVRecordReader.java:113)
> at
> org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:534)
> ... 17 common frames omitted
> {code}
> In such a case, it will constantly roll back the session and keep trying to
> pull the data, with the Exception continually occurring.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)