[
https://issues.apache.org/jira/browse/NIFI-5667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644217#comment-16644217
]
ASF GitHub Bot commented on NIFI-5667:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3057#discussion_r223893228
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
---
@@ -157,19 +155,17 @@ public String getDefaultCompressionType(final
ProcessorInitializationContext con
public HDFSRecordWriter createHDFSRecordWriter(final ProcessContext
context, final FlowFile flowFile, final Configuration conf, final Path path,
final RecordSchema schema)
throws IOException, SchemaNotFoundException {
- final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
-
final long stripeSize =
context.getProperty(STRIPE_SIZE).asDataSize(DataUnit.B).longValue();
final int bufferSize =
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final CompressionKind compressionType =
CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue());
final boolean normalizeForHive =
context.getProperty(HIVE_FIELD_NAMES).asBoolean();
- TypeInfo orcSchema = NiFiOrcUtils.getOrcField(avroSchema,
normalizeForHive);
+ TypeInfo orcSchema = NiFiOrcUtils.getOrcSchema(schema,
normalizeForHive);
final Writer orcWriter = NiFiOrcUtils.createWriter(path, conf,
orcSchema, stripeSize, compressionType, bufferSize);
final String hiveTableName =
context.getProperty(HIVE_TABLE_NAME).isSet()
?
context.getProperty(HIVE_TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue()
- :
NiFiOrcUtils.normalizeHiveTableName(avroSchema.getFullName());
+ :
NiFiOrcUtils.normalizeHiveTableName(schema.toString());// TODO
--- End diff --
I admit I hadn't tested this part, the TODO should be removed but we likely
need a way to get at the "name" of the top-level record if the Hive Table Name
property is not set. Then again, I haven't seen anyone rely on the schema's
full name as the table name, the Hive Table Name property is the recommended
way to set this for the generated DDL. Welcome all comments though :)
> Hive3 PutOrc processors, error when using nestled Avro Record types
> -------------------------------------------------------------------
>
> Key: NIFI-5667
> URL: https://issues.apache.org/jira/browse/NIFI-5667
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 1.8.0, 1.7.1
> Environment: Centos 7 and Docker Image from Hortonworks
> Reporter: Viking Karstorp
> Assignee: Matt Burgess
> Priority: Major
>
> I have been testing out the new PutOrc processor that was introduced in 1.7
> to see if I can replace the ConvertAvroToOrc processer I currently use.
> When I sent in some of the complex Avro messages in my flow I encountered the
> following error (see full stack further down)
> java.lang.IllegalArgumentException: Error converting object of type
> org.apache.nifi.serialization.record.MapRecord to ORC type The older
> ConvertAvroToOrc processor processed the flowfile without issues. Also to
> note is that the PutOrc processor handles the flowfile fine if there is no
> Avro data with only the schema present. It seems to be related to nestled
> "Record" types.
> How to reproduce:
> Avro schema: bug.avsc
> {code}
> {
> "name": "nifi_hive3_test",
> "namespace": "analytics.models.test",
> "type": "record",
> "fields": [
> {
> "name": "Serial",
> "type":
> {
> "name": "Serial",
> "namespace": "analytics.models.common.serial",
> "type": "record",
> "fields": [
> {
> "name": "Serial",
> "type": "long"
> }
> ]
> }
> }
> ]
> }
> {code}
> Small python script to create an Avro file.
> {code}
> import avro.schema
> from avro.datafile import DataFileReader, DataFileWriter
> from avro.io import DatumReader, DatumWriter
> schema = avro.schema.parse(open("bug.avsc", "rb").read())
> writer = DataFileWriter(open("bug.avro", "wb"), DatumWriter(), schema)
> writer.append({'Serial': {'Serial': 11088000000001615L}})
> writer.close()
> #Print whats entered into the avro file
> reader1 = DataFileReader(open("bug.avro", "rb"), DatumReader())
> for user in reader1:
> print user
> {code}
> Then just load the avro file using ListFIle -> FetchFile
> Full error message:
> {code}
> 2018-10-06 15:54:10,201 ERROR [Timer-Driven Process Thread-8]
> org.apache.nifi.processors.orc.PutORC
> PutORC[id=8be207cb-b16e-3578-1765-1c9e0c0aa383] Failed to write due to
> java.lang.IllegalArgumentException: Error converting object of type
> org.apache.nifi.serialization.record.MapRecord to ORC type
> struct<serial:bigint>: java.lang.IllegalArgumentException: Error converting
> object of type org.apache.nifi.serialization.record.MapRecord to ORC type
> struct<serial:bigint>
> java.lang.IllegalArgumentException: Error converting object of type
> org.apache.nifi.serialization.record.MapRecord to ORC type
> struct<serial:bigint>
> at
> org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:206)
> at
> org.apache.nifi.processors.orc.record.ORCHDFSRecordWriter.write(ORCHDFSRecordWriter.java:71)
> at
> org.apache.nifi.processors.orc.record.ORCHDFSRecordWriter.write(ORCHDFSRecordWriter.java:91)
> at
> org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.lambda$null$0(AbstractPutHDFSRecord.java:324)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2218)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2186)
> at
> org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.lambda$onTrigger$1(AbstractPutHDFSRecord.java:305)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:360)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1662)
> at
> org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.onTrigger(AbstractPutHDFSRecord.java:272)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)