Kengo Seki created CAMEL-19586:
----------------------------------
Summary: camel-parquet-avro - Allow users to unmarshal Parquet
file into Avro's GenericRecords
Key: CAMEL-19586
URL: https://issues.apache.org/jira/browse/CAMEL-19586
Project: Camel
Issue Type: Improvement
Reporter: Kengo Seki
Assignee: Kengo Seki
Currently, the Parquet-Avro data format requires users to define a POJO class
for marshalling and unmarshalling. It's a bit bothering especially for
unmarshalling an existing Parquet file with a complicated data structure.
[Avro provides GenericRecord in a such
case|https://avro.apache.org/docs/1.11.1/getting-started-java/#serializing-and-deserializing-without-code-generation],
but it doesn't work with the current unmarshaller for now, as follows:
{code}
$ cat example.java
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS org.slf4j:slf4j-simple:1.7.31
//DEPS org.apache.camel:camel-bom:4.0.0-M3@pom
//DEPS org.apache.camel:camel-core
//DEPS org.apache.camel:camel-main
//DEPS org.apache.camel:camel-parquet-avro:4.0.0-SNAPSHOT
//DEPS org.apache.hadoop:hadoop-client:3.3.6
import org.apache.avro.generic.GenericRecord;
import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.dataformat.parquet.avro.*;
import org.apache.camel.main.*;
import org.apache.camel.spi.*;
import static org.apache.camel.builder.PredicateBuilder.*;
class example {
public static void main(String... args) throws Exception {
System.setProperty("org.slf4j.simpleLogger.logFile", "System.out");
Main main = new Main();
ParquetAvroDataFormat format = new ParquetAvroDataFormat();
format.setUnmarshalType(GenericRecord.class);
main.configure().addRoutesBuilder(new RouteBuilder() {
public void configure() throws Exception {
from("file:/tmp?fileName=example1.parquet&noop=true")
.unmarshal(format)
.marshal(format)
.log("${body}");
}
});
main.run();
}
}
$ jbang example.java
...
[Camel (camel-1) thread #1 - file:///tmp] ERROR
org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery
for (MessageId: 356E1287483C55C-0000000000000000 on ExchangeId:
356E1287483C55C-0000000000000000). Exhausted after delivery attempt: 1 caught:
org.apache.avro.AvroRuntimeException: Not a Specific class: interface
org.apache.avro.generic.GenericRecord
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID
Processor Elapsed (ms)
route1/route1
from[file:///tmp?fileName=example1.parquet&noop=tr 89591203
...
route1/marshal1
marshal[org.apache.camel.model.DataFormatDefinitio 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.avro.AvroRuntimeException: Not a Specific class: interface
org.apache.avro.generic.GenericRecord
at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:403)
at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:734)
at
org.apache.avro.specific.SpecificData.lambda$new$3(SpecificData.java:337)
at
org.apache.avro.util.internal.ClassValueCache$1.computeValue(ClassValueCache.java:35)
at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
at java.base/java.lang.ClassValue.get(ClassValue.java:116)
at
org.apache.avro.util.internal.ClassValueCache.apply(ClassValueCache.java:45)
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346)
at
org.apache.camel.dataformat.parquet.avro.ParquetAvroDataFormat.marshal(ParquetAvroDataFormat.java:70)
at
org.apache.camel.support.processor.MarshalProcessor.process(MarshalProcessor.java:64)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:475)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:164)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:379)
at
org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:491)
at
org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:244)
at
org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:205)
at
org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202)
at
org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
{code}
So I'd like to propose a new feature to unmarshal Parquet data into Avro's
GenericRecord (and vice versa) if POJO is not specified as unmarshalType.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)