psolomin commented on issue #19066: URL: https://github.com/apache/beam/issues/19066#issuecomment-1752180280
I would like to ask a follow-up question on this: `ParquetIO` being hard-coded to `GenericRecord`, seems to be practically unusable for logical types cause I did not find a coder for it which actually supports Avro logical types. Am I mistaken? Related issue: https://github.com/apache/beam/issues/18874 Related PR which adds support for logical types: https://github.com/apache/beam/pull/26320 Here's an example of a pipeline which reads Avro and writes Parquet files: ``` ... reading byte arrays from source ... .apply("Parse payloads", ParDo.of(new ConsumedEventDeserializer())) .setCoder(AvroCoder.of(GenericRecord.class, ConsumedEvent.SCHEMA$)) .apply( "Sink to S3", FileIO.<GenericRecord>write() .via(ParquetIO.sink(ConsumedEvent.SCHEMA$) .withCompressionCodec(CompressionCodecName.SNAPPY)) .to(opts.getSinkLocation()) .withNaming(new NoColonFileNaming(runId))); ``` But it produces an exception: ``` Caused by: java.lang.ClassCastException: class java.time.Instant cannot be cast to class java.lang.Number (java.time.Instant and java.lang.Number are in module java.base of loader 'bootstrap') at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:160) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:81) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:221) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) at org.apache.beam.sdk.extensions.avro.coders.AvroCoder.encode(AvroCoder.java:378) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:132) at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:86) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:70) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:55) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:168) at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118) at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49) at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115) at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423) at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76) at com.psolomin.consumer.ConsumedEventDeserializer.processElement(ConsumedEventDeserializer.java:34) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
