clairemcginty opened a new issue, #33875: URL: https://github.com/apache/beam/issues/33875
### What happened? I'm writing using Beam 2.62.0's BigQueryIO using [AvroFormatFunction/AvroSchemaFactory](https://github.com/apache/beam/blob/v2.62.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2737-L2740) to write Avro `GenericRecords` directly. However, when my records contain a nullable nested record type, the writes fail on Dataflow. Write setup (sorry for the mixed Scala code, this is ported from Scio): ```scala // Schemas val NestedAvroSchema = new Schema.Parser().parse(s""" |{ | "type": "record", | "name": "NestedRecord", | "namespace": "com.example", | "fields": [ {"name": "id", "type": "int"} ] |} |""") val AvroSchema = new Schema.Parser().parse(s""" |{ | "type": "record", | "name": "MyRecord", | "namespace": "com.example", | "fields": [ | {"name": "id", "type": "int"}, | {"name": "nestedField", "type": [ | "null", | $NestedAvroSchema | ]} | ] |} |""") // Pipeline logic val (sc, args) = ContextAndArgs(cmdlineArgs) // creates a Pipeline sc .parallelize(1 to 10) .map(i => new GenericRecordBuilder(AvroSchema) .set("id", i) .set( "nestedField", new GenericRecordBuilder(NestedAvroSchema) .set("id", i) .build() ) .build() .asInstanceOf[GenericRecord] ) .setCoder(org.apache.beam.sdk.extensions.avro.coders.AvroCoder.generic(AvroSchema)) .saveAsCustomOutput( "Write to BQ", org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO .write[GenericRecord]() .to(<output table>) .withMethod(Method.DEFAULT) .withSchema( new TableSchema().setFields( ImmutableList.of( new TableFieldSchema().setName("id").setType("INTEGER").setMode("REQUIRED"), new TableFieldSchema() .setName("nestedField") .setType("RECORD") .setFields( ImmutableList.of( new TableFieldSchema().setName("id").setType("INTEGER").setMode("REQUIRED") ) ) .setMode("NULLABLE") ) ) ) .withCreateDisposition(CREATE_IF_NEEDED) .withWriteDisposition(WRITE_TRUNCATE) .withAvroFormatFunction(Functions.serializableFn(_.getElement)) .withAvroSchemaFactory( Functions.serializableFn(BigQueryUtils.toGenericAvroSchema(_, true)) ) ) ``` The job fails on Dataflow with the following stack trace: ``` Error message from worker: org.apache.beam.sdk.util.UserCodeException: org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.UnresolvedUnionException: Not in union ["null",{"type":"record","name":"nestedField","namespace":"org.apache.beam.sdk.io.gcp.bigquery","doc":"Translated Avro Schema for nestedField","fields":[{"name":"id","type":"long"}]}]: {"id": 1} (field=nestedField) org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:823) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.MapFnRunners$ExplodedWindowedValueMapperFactory.lambda$create$0(MapFnRunners.java:133) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2240) org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84) org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:823) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2695) com.spotify.scio.util.Functions$$anon$8.processElement(Functions.scala:278) com.spotify.scio.util.Functions$$anon$8$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2264) org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:322) org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1131) org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:666) org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:661) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:172) org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:539) org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150) org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115) java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:840) Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.UnresolvedUnionException: Not in union ["null",{"type":"record","name":"nestedField","namespace":"org.apache.beam.sdk.io.gcp.bigquery","doc":"Translated Avro Schema for nestedField","fields":[{"name":"id","type":"long"}]}]: {"id": 1} (field=nestedField) org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:326) org.apache.beam.sdk.io.gcp.bigquery.AvroRowWriter.write(AvroRowWriter.java:58) org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:247) Caused by: org.apache.avro.UnresolvedUnionException: Not in union ["null",{"type":"record","name":"nestedField","namespace":"org.apache.beam.sdk.io.gcp.bigquery","doc":"Translated Avro Schema for nestedField","fields":[{"name":"id","type":"long"}]}]: {"id": 1} (field=nestedField) org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:247) org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234) org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145) org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:323) org.apache.beam.sdk.io.gcp.bigquery.AvroRowWriter.write(AvroRowWriter.java:58) org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:247) org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:823) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.MapFnRunners$ExplodedWindowedValueMapperFactory.lambda$create$0(MapFnRunners.java:133) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2240) org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84) org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:823) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2695) com.spotify.scio.util.Functions$$anon$8.processElement(Functions.scala:278) com.spotify.scio.util.Functions$$anon$8$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2264) org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:322) org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1131) org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:666) org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:661) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:172) org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:539) org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150) org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115) java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:840) Suppressed: org.apache.avro.UnresolvedUnionException: Not in union ["null",{"type":"record","name":"nestedField","namespace":"org.apache.beam.sdk.io.gcp.bigquery","doc":"Translated Avro Schema for nestedField","fields":[{"name":"id","type":"long"}]}]: {"id": 1} at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:910) at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:307) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:157) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245) ... 52 more ``` This happens on both Avro 1.8 and 1.11. ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [ ] Component: Python SDK - [x] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [x] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
