[
https://issues.apache.org/jira/browse/CAMEL-19586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen updated CAMEL-19586:
--------------------------------
Fix Version/s: 4.0.0
4.0-RC2
> camel-parquet-avro - Allow users to unmarshal Parquet file into Avro's
> GenericRecords
> -------------------------------------------------------------------------------------
>
> Key: CAMEL-19586
> URL: https://issues.apache.org/jira/browse/CAMEL-19586
> Project: Camel
> Issue Type: Improvement
> Reporter: Kengo Seki
> Assignee: Kengo Seki
> Priority: Major
> Fix For: 4.0.0, 4.0-RC2
>
>
> Currently, the Parquet-Avro data format requires users to define a POJO class
> for marshalling and unmarshalling. It's a bit bothering especially for
> unmarshalling an existing Parquet file with a complicated data structure.
> [Avro provides GenericRecord in a such
> case|https://avro.apache.org/docs/1.11.1/getting-started-java/#serializing-and-deserializing-without-code-generation],
> but it doesn't work with the current unmarshaller for now, as follows:
> {code}
> $ cat example.java
> ///usr/bin/env jbang "$0" "$@" ; exit $?
> //DEPS org.slf4j:slf4j-simple:1.7.31
> //DEPS org.apache.camel:camel-bom:4.0.0-M3@pom
> //DEPS org.apache.camel:camel-core
> //DEPS org.apache.camel:camel-main
> //DEPS org.apache.camel:camel-parquet-avro:4.0.0-SNAPSHOT
> //DEPS org.apache.hadoop:hadoop-client:3.3.6
> import org.apache.avro.generic.GenericRecord;
> import org.apache.camel.*;
> import org.apache.camel.builder.*;
> import org.apache.camel.dataformat.parquet.avro.*;
> import org.apache.camel.main.*;
> import org.apache.camel.spi.*;
> import static org.apache.camel.builder.PredicateBuilder.*;
> class example {
> public static void main(String... args) throws Exception {
> System.setProperty("org.slf4j.simpleLogger.logFile", "System.out");
> Main main = new Main();
> ParquetAvroDataFormat format = new ParquetAvroDataFormat();
> format.setUnmarshalType(GenericRecord.class);
> main.configure().addRoutesBuilder(new RouteBuilder() {
> public void configure() throws Exception {
> from("file:/tmp?fileName=example1.parquet&noop=true")
> .unmarshal(format)
> .marshal(format)
> .log("${body}");
> }
> });
> main.run();
> }
> }
> $ jbang example.java
> ...
> [Camel (camel-1) thread #1 - file:///tmp] ERROR
> org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery
> for (MessageId: 356E1287483C55C-0000000000000000 on ExchangeId:
> 356E1287483C55C-0000000000000000). Exhausted after delivery attempt: 1
> caught: org.apache.avro.AvroRuntimeException: Not a Specific class: interface
> org.apache.avro.generic.GenericRecord
> Message History (source location and message history is disabled)
> ---------------------------------------------------------------------------------------------------------------------------------------
> Source ID
> Processor Elapsed (ms)
> route1/route1
> from[file:///tmp?fileName=example1.parquet&noop=tr 89591203
> ...
> route1/marshal1
> marshal[org.apache.camel.model.DataFormatDefinitio 0
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> org.apache.avro.AvroRuntimeException: Not a Specific class: interface
> org.apache.avro.generic.GenericRecord
> at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:403)
> at
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:734)
> at
> org.apache.avro.specific.SpecificData.lambda$new$3(SpecificData.java:337)
> at
> org.apache.avro.util.internal.ClassValueCache$1.computeValue(ClassValueCache.java:35)
> at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
> at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
> at java.base/java.lang.ClassValue.get(ClassValue.java:116)
> at
> org.apache.avro.util.internal.ClassValueCache.apply(ClassValueCache.java:45)
> at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346)
> at
> org.apache.camel.dataformat.parquet.avro.ParquetAvroDataFormat.marshal(ParquetAvroDataFormat.java:70)
> at
> org.apache.camel.support.processor.MarshalProcessor.process(MarshalProcessor.java:64)
> at
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:475)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:164)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:379)
> at
> org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:491)
> at
> org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:244)
> at
> org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:205)
> at
> org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202)
> at
> org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
> at
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> So I'd like to propose a new feature to unmarshal Parquet data into Avro's
> GenericRecord (and vice versa) if POJO is not specified as unmarshalType.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)