[ 
https://issues.apache.org/jira/browse/NIFI-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613852#comment-16613852
 ] 

ASF GitHub Bot commented on NIFI-5592:
--------------------------------------

GitHub user markap14 opened a pull request:

    https://github.com/apache/nifi/pull/3001

    NIFI-5592: If an Exception is thrown by RecordReader.read() from Cons…

    …umeKafkaRecord, route Record to parse.failure relationship
    
    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/markap14/nifi NIFI-5592

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/3001.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3001
    
----
commit 3c751e9556e400cd55e6b28bee8d73e9ff97db2b
Author: Mark Payne <markap14@...>
Date:   2018-09-13T17:56:13Z

    NIFI-5592: If an Exception is thrown by RecordReader.read() from 
ConsumeKafkaRecord, route Record to parse.failure relationship

----


> ConsumeKafkaRecord* processors can stop pulling data if the data doesn't 
> match the configured schema
> ----------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-5592
>                 URL: https://issues.apache.org/jira/browse/NIFI-5592
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>
> If the data in the kafka topic does not adhere to the configured schema, the 
> processor should route the data to 'parse.failure' but in some conditions, we 
> may encounter the following SchemaValidationException:
> {code}
> 2018-09-13 07:37:54,196 ERROR [Timer-Driven Process Thread-1] 
> o.a.n.p.k.pubsub.ConsumeKafkaRecord_1_0 
> ConsumeKafkaRecord_1_0[id=c258fa20-0165-1000-ffff-ffffb401d2c7] Exception 
> while processing data from kafka so will close the lease 
> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@3fa54755
>  due to org.apache.nifi.processor.exception.ProcessException: 
> org.apache.nifi.serialization.SchemaValidationException: Field designation 
> cannot be null: org.apache.nifi.processor.exception.ProcessException: 
> org.apache.nifi.serialization.SchemaValidationException: Field designation 
> cannot be null
> org.apache.nifi.processor.exception.ProcessException: 
> org.apache.nifi.serialization.SchemaValidationException: Field designation 
> cannot be null
>  at 
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:587)
>  at 
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$2(ConsumerLease.java:330)
>  at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
>  at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>  at 
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:317)
>  at 
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:178)
>  at 
> org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.onTrigger(ConsumeKafkaRecord_1_0.java:378)
>  at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>  at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
>  at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
>  at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.nifi.serialization.SchemaValidationException: Field 
> designation cannot be null
>  at 
> org.apache.nifi.serialization.record.MapRecord.checkTypes(MapRecord.java:81)
>  at org.apache.nifi.serialization.record.MapRecord.<init>(MapRecord.java:52)
>  at org.apache.nifi.csv.CSVRecordReader.nextRecord(CSVRecordReader.java:113)
>  at 
> org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
>  at 
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:534)
>  ... 17 common frames omitted
> {code}
> In such a case, it will constantly roll back the session and keep trying to 
> pull the data, with the Exception continually occurring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to