psolomin commented on issue #19066:
URL: https://github.com/apache/beam/issues/19066#issuecomment-1752180280

   I would like to ask a follow-up question on this: `ParquetIO` being 
hard-coded to `GenericRecord`, seems to be practically unusable for logical 
types cause I did not find a coder for it which actually supports Avro logical 
types. Am I mistaken?
   
   Related issue: https://github.com/apache/beam/issues/18874
   Related PR which adds support for logical types: 
https://github.com/apache/beam/pull/26320
   
   Here's an example of a pipeline which reads Avro and writes Parquet files:
   
   ```
   ... reading byte arrays from source ...
   
   .apply("Parse payloads", ParDo.of(new ConsumedEventDeserializer()))
   .setCoder(AvroCoder.of(GenericRecord.class, ConsumedEvent.SCHEMA$))
   .apply(
           "Sink to S3",
           FileIO.<GenericRecord>write()
                   .via(ParquetIO.sink(ConsumedEvent.SCHEMA$)
                           .withCompressionCodec(CompressionCodecName.SNAPPY))
                   .to(opts.getSinkLocation())
                   .withNaming(new NoColonFileNaming(runId)));
   ```
   
   But it produces an exception:
   
   ```
   Caused by: java.lang.ClassCastException: class java.time.Instant cannot be 
cast to class java.lang.Number (java.time.Instant and java.lang.Number are in 
module java.base of loader 'bootstrap')
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:160)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:81)
        at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:221)
        at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
        at 
org.apache.beam.sdk.extensions.avro.coders.AvroCoder.encode(AvroCoder.java:378)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:132)
        at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:86)
        at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:70)
        at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:55)
        at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:168)
        at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
        at 
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
        at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
        at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
        at 
com.psolomin.consumer.ConsumedEventDeserializer.processElement(ConsumedEventDeserializer.java:34)
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to