[ 
https://issues.apache.org/jira/browse/CAMEL-19586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen resolved CAMEL-19586.
---------------------------------
    Resolution: Fixed

> camel-parquet-avro - Allow users to unmarshal Parquet file into Avro's 
> GenericRecords
> -------------------------------------------------------------------------------------
>
>                 Key: CAMEL-19586
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19586
>             Project: Camel
>          Issue Type: Improvement
>            Reporter: Kengo Seki
>            Assignee: Kengo Seki
>            Priority: Major
>             Fix For: 4.0.0, 4.0-RC2
>
>
> Currently, the Parquet-Avro data format requires users to define a POJO class 
> for marshalling and unmarshalling. It's a bit bothering especially for 
> unmarshalling an existing Parquet file with a complicated data structure.
> [Avro provides GenericRecord in a such 
> case|https://avro.apache.org/docs/1.11.1/getting-started-java/#serializing-and-deserializing-without-code-generation],
>  but it doesn't work with the current unmarshaller for now, as follows:
> {code}
> $ cat example.java
> ///usr/bin/env jbang "$0" "$@" ; exit $?
> //DEPS org.slf4j:slf4j-simple:1.7.31
> //DEPS org.apache.camel:camel-bom:4.0.0-M3@pom
> //DEPS org.apache.camel:camel-core
> //DEPS org.apache.camel:camel-main
> //DEPS org.apache.camel:camel-parquet-avro:4.0.0-SNAPSHOT
> //DEPS org.apache.hadoop:hadoop-client:3.3.6
> import org.apache.avro.generic.GenericRecord;
> import org.apache.camel.*;
> import org.apache.camel.builder.*;
> import org.apache.camel.dataformat.parquet.avro.*;
> import org.apache.camel.main.*;
> import org.apache.camel.spi.*;
> import static org.apache.camel.builder.PredicateBuilder.*;
> class example {
>     public static void main(String... args) throws Exception {
>         System.setProperty("org.slf4j.simpleLogger.logFile", "System.out");
>         Main main = new Main();
>         ParquetAvroDataFormat format = new ParquetAvroDataFormat();
>         format.setUnmarshalType(GenericRecord.class);
>         main.configure().addRoutesBuilder(new RouteBuilder() {
>             public void configure() throws Exception {
>                 from("file:/tmp?fileName=example1.parquet&noop=true")
>                     .unmarshal(format)
>                     .marshal(format)
>                     .log("${body}");
>             }
>         });
>         main.run();
>     }
> }
> $ jbang example.java
> ...
> [Camel (camel-1) thread #1 - file:///tmp] ERROR 
> org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery 
> for (MessageId: 356E1287483C55C-0000000000000000 on ExchangeId: 
> 356E1287483C55C-0000000000000000). Exhausted after delivery attempt: 1 
> caught: org.apache.avro.AvroRuntimeException: Not a Specific class: interface 
> org.apache.avro.generic.GenericRecord
> Message History (source location and message history is disabled)
> ---------------------------------------------------------------------------------------------------------------------------------------
> Source                                   ID                             
> Processor                                          Elapsed (ms)
>                                          route1/route1                  
> from[file:///tmp?fileName=example1.parquet&noop=tr     89591203
>       ...
>                                          route1/marshal1                
> marshal[org.apache.camel.model.DataFormatDefinitio            0
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> org.apache.avro.AvroRuntimeException: Not a Specific class: interface 
> org.apache.avro.generic.GenericRecord
>       at 
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:403)
>       at 
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:734)
>       at 
> org.apache.avro.specific.SpecificData.lambda$new$3(SpecificData.java:337)
>       at 
> org.apache.avro.util.internal.ClassValueCache$1.computeValue(ClassValueCache.java:35)
>       at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
>       at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
>       at java.base/java.lang.ClassValue.get(ClassValue.java:116)
>       at 
> org.apache.avro.util.internal.ClassValueCache.apply(ClassValueCache.java:45)
>       at 
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346)
>       at 
> org.apache.camel.dataformat.parquet.avro.ParquetAvroDataFormat.marshal(ParquetAvroDataFormat.java:70)
>       at 
> org.apache.camel.support.processor.MarshalProcessor.process(MarshalProcessor.java:64)
>       at 
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:475)
>       at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
>       at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:164)
>       at 
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:379)
>       at 
> org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:491)
>       at 
> org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:244)
>       at 
> org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:205)
>       at 
> org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202)
>       at 
> org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>       at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>       at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>       at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> So I'd like to propose a new feature to unmarshal Parquet data into Avro's 
> GenericRecord (and vice versa) if POJO is not specified as unmarshalType.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to