bvolpato opened a new issue, #25122: URL: https://github.com/apache/beam/issues/25122
### What happened? BigQueryIO has coder issues when using a very specific codepath utilizing `withFormatFunction()` + `getFailedInsertsWithErr()`, which are just exposed when using Dataflow Runner v2. In short, I have a `PCollection<byte[]>` that gets written to BigQueryIO (through Streaming Inserts) and uses `.withFormatFunction()` to convert it into a TableRow. Somehow, with `withExtendedErrorInfo` and consuming `getFailedInsertsWithErr` to pipe to another `BigQueryIO`, the underlying coder is still `ByteArrayCoder` (the original input's coder, not TableRow) and `FnApiDoFnRunner` can not consume the affected TableRows correctly. ---- Code to reproduce: https://gist.github.com/bvolpato/a5f3f1f44071eafc1034935bc4fffbae Cause: ``` java.lang.ClassCastException: class com.google.api.services.bigquery.model.TableRow cannot be cast to class [B (com.google.api.services.bigquery.model.TableRow is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap') at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:53) at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:44) at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:30) ``` Full stack trace: ``` ERROR [main] (MonitoringUtil.java:84) - 2023-01-23T17:44:23.179Z: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.gcp.bigquery.TableRowInfo@1b992c76' with coder 'TableRowInfoCoder'. at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.io.gcp.bigquery.TagWithUniqueIds.processElement(TagWithUniqueIds.java:68) at org.apache.beam.sdk.io.gcp.bigquery.TagWithUniqueIds$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2240) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.io.gcp.bigquery.GenerateShardedTable.processElement(GenerateShardedTable.java:44) at org.apache.beam.sdk.io.gcp.bigquery.GenerateShardedTable$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:813) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn.processElement(CreateTables.java:126) at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2240) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84) at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:813) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78) at com.sample.CreateByteToBigQueryDLQ$1.processElement(CreateByteToBigQueryDLQ.java:104) at com.sample.CreateByteToBigQueryDLQ$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$FinishBundleArgumentProvider$Context.output(FnApiDoFnRunner.java:2139) at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:298) at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1773) at org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:133) at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:556) at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.gcp.bigquery.TableRowInfo@1b992c76' with coder 'TableRowInfoCoder'. at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) at org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130) at org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:461) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) Caused by: java.lang.ClassCastException: class com.google.api.services.bigquery.model.TableRow cannot be cast to class [B (com.google.api.services.bigquery.model.TableRow is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap') at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:53) at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:44) at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:30) at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297) at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) at org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130) at org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:461) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.io.gcp.bigquery.TagWithUniqueIds.processElement(TagWithUniqueIds.java:68) at org.apache.beam.sdk.io.gcp.bigquery.TagWithUniqueIds$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2240) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.io.gcp.bigquery.GenerateShardedTable.processElement(GenerateShardedTable.java:44) at org.apache.beam.sdk.io.gcp.bigquery.GenerateShardedTable$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:813) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn.processElement(CreateTables.java:126) at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2240) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84) at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:813) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78) at com.sample.CreateByteToBigQueryDLQ$1.processElement(CreateByteToBigQueryDLQ.java:104) at com.sample.CreateByteToBigQueryDLQ$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$FinishBundleArgumentProvider$Context.output(FnApiDoFnRunner.java:2139) at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:298) at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1773) at org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:133) at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:556) at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [ ] Component: Python SDK - [X] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [X] Component: IO connector - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [X] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
