clairemcginty opened a new issue, #33875:
URL: https://github.com/apache/beam/issues/33875

   ### What happened?
   
   I'm writing using Beam 2.62.0's BigQueryIO using 
[AvroFormatFunction/AvroSchemaFactory](https://github.com/apache/beam/blob/v2.62.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2737-L2740)
 to write Avro `GenericRecords` directly. However, when my records contain a 
nullable nested record type, the writes fail on Dataflow.
   
   Write setup (sorry for the mixed Scala code, this is ported from Scio):
   
   ```scala
   // Schemas
   val NestedAvroSchema = new Schema.Parser().parse(s"""
        |{
        |   "type": "record",
        |   "name": "NestedRecord",
        |   "namespace": "com.example",
        |   "fields": [ {"name": "id", "type": "int"} ]
        |}
        |""")
   
   val AvroSchema = new Schema.Parser().parse(s"""
        |{
        |    "type": "record",
        |    "name": "MyRecord",
        |    "namespace": "com.example",
        |    "fields": [
        |        {"name": "id", "type": "int"},
        |        {"name": "nestedField", "type": [
        |          "null",
        |          $NestedAvroSchema
        |        ]}
        |    ]
        |}
        |""")
   
   // Pipeline logic
   val (sc, args) = ContextAndArgs(cmdlineArgs) // creates a Pipeline
   
   sc
     .parallelize(1 to 10)
     .map(i =>
       new GenericRecordBuilder(AvroSchema)
         .set("id", i)
         .set(
           "nestedField",
           new GenericRecordBuilder(NestedAvroSchema)
             .set("id", i)
             .build()
         )
         .build()
         .asInstanceOf[GenericRecord]
     )
     
.setCoder(org.apache.beam.sdk.extensions.avro.coders.AvroCoder.generic(AvroSchema))
     .saveAsCustomOutput(
       "Write to BQ",
       org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
         .write[GenericRecord]()
         .to(<output table>)
         .withMethod(Method.DEFAULT)
         .withSchema(
           new TableSchema().setFields(
             ImmutableList.of(
               new 
TableFieldSchema().setName("id").setType("INTEGER").setMode("REQUIRED"),
               new TableFieldSchema()
                 .setName("nestedField")
                 .setType("RECORD")
                 .setFields(
                   ImmutableList.of(
                     new 
TableFieldSchema().setName("id").setType("INTEGER").setMode("REQUIRED")
                   )
                 )
                 .setMode("NULLABLE")
             )
           )
         )
         .withCreateDisposition(CREATE_IF_NEEDED)
         .withWriteDisposition(WRITE_TRUNCATE)
         .withAvroFormatFunction(Functions.serializableFn(_.getElement))
         .withAvroSchemaFactory(
           Functions.serializableFn(BigQueryUtils.toGenericAvroSchema(_, true))
         )
   )
   ```
   
   The job fails on Dataflow with the following stack trace:
   
   ```
   Error message from worker: org.apache.beam.sdk.util.UserCodeException: 
org.apache.avro.file.DataFileWriter$AppendWriteException: 
org.apache.avro.UnresolvedUnionException: Not in union 
["null",{"type":"record","name":"nestedField","namespace":"org.apache.beam.sdk.io.gcp.bigquery","doc":"Translated
 Avro Schema for nestedField","fields":[{"name":"id","type":"long"}]}]: {"id": 
1} (field=nestedField)
        
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        
org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:823)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.MapFnRunners$ExplodedWindowedValueMapperFactory.lambda$create$0(MapFnRunners.java:133)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2240)
        
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84)
        
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:823)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2695)
        
com.spotify.scio.util.Functions$$anon$8.processElement(Functions.scala:278)
        
com.spotify.scio.util.Functions$$anon$8$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2264)
        
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:322)
        
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1131)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:666)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:661)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
        
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:172)
        
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:539)
        
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
        
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
        
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: 
org.apache.avro.UnresolvedUnionException: Not in union 
["null",{"type":"record","name":"nestedField","namespace":"org.apache.beam.sdk.io.gcp.bigquery","doc":"Translated
 Avro Schema for nestedField","fields":[{"name":"id","type":"long"}]}]: {"id": 
1} (field=nestedField)
        org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:326)
        
org.apache.beam.sdk.io.gcp.bigquery.AvroRowWriter.write(AvroRowWriter.java:58)
        
org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:247)
   Caused by: org.apache.avro.UnresolvedUnionException: Not in union 
["null",{"type":"record","name":"nestedField","namespace":"org.apache.beam.sdk.io.gcp.bigquery","doc":"Translated
 Avro Schema for nestedField","fields":[{"name":"id","type":"long"}]}]: {"id": 
1} (field=nestedField)
        
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:247)
        
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
        
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
        
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
        
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
        org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:323)
        
org.apache.beam.sdk.io.gcp.bigquery.AvroRowWriter.write(AvroRowWriter.java:58)
        
org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:247)
        
org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:823)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.MapFnRunners$ExplodedWindowedValueMapperFactory.lambda$create$0(MapFnRunners.java:133)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2240)
        
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84)
        
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:823)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2695)
        
com.spotify.scio.util.Functions$$anon$8.processElement(Functions.scala:278)
        
com.spotify.scio.util.Functions$$anon$8$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2264)
        
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:322)
        
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1131)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:666)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:661)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
        
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:172)
        
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:539)
        
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
        
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
        
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        java.base/java.lang.Thread.run(Thread.java:840)
        Suppressed: org.apache.avro.UnresolvedUnionException: Not in union 
["null",{"type":"record","name":"nestedField","namespace":"org.apache.beam.sdk.io.gcp.bigquery","doc":"Translated
 Avro Schema for nestedField","fields":[{"name":"id","type":"long"}]}]: {"id": 
1}
                at 
org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:910)
                at 
org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:307)
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:157)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
                at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
                ... 52 more
   ```
   
   This happens on both Avro 1.8 and 1.11.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [x] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [x] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to