[
https://issues.apache.org/jira/browse/NIFI-1403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193224#comment-15193224
]
Toivo Adams commented on NIFI-1403:
-----------------------------------
I created a test and did some investigation.
ConvertCSVToAvro uses kite SDK do convert CSV to Avro.
More specifically CSVFileReader and this in turn uses CSVRecordBuilder.
Test fails:
2016-03-14 14:42:16,900 (pool-1-thread-1) [DEBUG -
org.kitesdk.data.spi.filesystem.CSVFileReader.close(CSVFileReader.java:187)]
Closing reader on path:null
2016-03-14 14:42:16,904 (pool-1-thread-1) [ERROR -
org.apache.nifi.util.CapturingLogger.error(CapturingLogger.java:375)]
[ConvertCSVToAvro[id=8dcac16f-34e8-4293-9c90-08411876aa1d]] Failed to read
FlowFile
org.kitesdk.data.DatasetOperationException: Unsupported field type:RECORD
at
org.kitesdk.data.spi.filesystem.CSVRecordBuilder.makeValue(CSVRecordBuilder.java:188)
at
org.kitesdk.data.spi.filesystem.CSVRecordBuilder.makeValue(CSVRecordBuilder.java:113)
at
org.kitesdk.data.spi.filesystem.CSVRecordBuilder.fillIndexed(CSVRecordBuilder.java:97)
Place of failure is CSVRecordBuilder.java:188
default:
// FIXED, BYTES, MAP, ARRAY, RECORD are not supported
throw new DatasetOperationException(
"Unsupported field type:" + schema.getType());
It seems nested CSV records are not supported.
Possible solutions:
1. Somehow CSVFileReader and CSVRecordBuilder can be forced to accept nested
records. How?
2. Something else should be used instead of CSVFileReader.
3. ConvertCSVToAvro will not support nested records.
Thanks
Toivo
> Avro schema with nested records seems to be not correctly managed by
> ConvertCSV2AVRO
> ------------------------------------------------------------------------------------
>
> Key: NIFI-1403
> URL: https://issues.apache.org/jira/browse/NIFI-1403
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 0.4.0
> Reporter: Massimiliano Nigrelli
>
> I have set up a flow where a .csv file (fakeCSV.csv) is read from file system
> and is then "passed" to a ConvertCSV2AVRO processor, which will forward the
> flow file to a ConvertAVRO2JSON processor (which finally writes on a Kafka
> queue via a PutKafka processor)
> The .CSV file is:
> pressure;1452685093;0.87;node01;3543.33
> The AVRO Schema is:
> {
> "name": "observation",
> "type": "record",
> "fields": [
> {"name": "metric", "type": "string"},
> {"name": "timestamp", "type": "long"},
> {"name": "value", "type": "string"},
> {"name": "tags", "type": {
> "name": "tagRecord",
> "type": "record",
> "fields": [
> {"name": "sensor", "type": "string"},
> {"name": "depth", "type": "string"}
> ]
> }
> }
> ]
> }
> I expect the following JSON "out" on the Kafka queue:
> {
> "metric": "pressure",
> "timestamp": 1452685093,
> "value": 0.87,
> "tags": {
> "sensor": "node01",
> "depth": "3543.33"
> }
> }
> When the flow is actvated, the processor seems to be stuck, returning the
> following error:
> ERROR [Timer-Driven Process Thread-4] o.a.n.processors.kite.ConvertCSVToAvro
> ConvertCSVToAvro[id=5526e2a4-72ae-46f5-8249-8694fa4d06f3] Failed to process
> session due to org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=fbbbab6c-e2b0-43c1-9fe4-335eb5e6ad85,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1452867190110-331,
> container=default, section=331], offset=755419,
> length=41],offset=0,name=fakeCSV.csv,size=41] transfer relationship not
> specified: org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=fbbbab6c-e2b0-43c1-9fe4-335eb5e6ad85,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1452867190110-331,
> container=default, section=331], offset=755419,
> length=41],offset=0,name=fakeCSV.csv,size=41] transfer relationship not
> specified
> Just to give you more details, if I do not add "nested" records in the AVRO
> schema, it perfectly works!!!
> Hope you will sort the issue out.
> Regards,
> MaxNigrelli72
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)